From 70aa2687d061fffb87a9280966e0c57981dad50c Mon Sep 17 00:00:00 2001 From: medina5 Date: Sun, 7 Dec 2025 16:04:44 +0100 Subject: [PATCH] ProcessMQTT --- main-mqtt.cpp | 4 +- main-mqtt2.cpp | 163 +++++++++++++++++++++++++++++++++++++++++++++++++ main.cpp | 29 ++++++++- main.hpp | 5 ++ 4 files changed, 197 insertions(+), 4 deletions(-) create mode 100644 main-mqtt2.cpp diff --git a/main-mqtt.cpp b/main-mqtt.cpp index c2ed3e4..c7914b7 100644 --- a/main-mqtt.cpp +++ b/main-mqtt.cpp @@ -8,7 +8,7 @@ #define TOPIC "test/topic" #define PAYLOAD "Hello MQTT" #define QOS 1 -#define TIMEOUT 10000L +#define TIMEOUT_MQTT 10000L int main() { MQTTClient client; @@ -33,7 +33,7 @@ int main() { MQTTClient_deliveryToken token; MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); - MQTTClient_waitForCompletion(client, token, TIMEOUT); + MQTTClient_waitForCompletion(client, token, TIMEOUT_MQTT); printf("Message publié !\n"); MQTTClient_disconnect(client, 10000); diff --git a/main-mqtt2.cpp b/main-mqtt2.cpp new file mode 100644 index 0000000..acf6850 --- /dev/null +++ b/main-mqtt2.cpp @@ -0,0 +1,163 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "mqtt/async_client.h" + +using namespace std::chrono_literals; + +/* Configuration MQTT */ +const std::string ADDRESS = "tcp://rabbitmq:1883"; +const std::string CLIENTID = "CppClientTP"; +const std::string TOPIC = "geii/ordre/#"; +const int QOS = 1; +const int CYCLE_MS = 100; + +/* Queue thread-safe */ +std::queue orders_queue; +std::mutex queue_mtx; + +std::string pop_all_and_get_last() { + std::lock_guard lock(queue_mtx); + if (orders_queue.empty()) return ""; + std::string last; + while (!orders_queue.empty()) { + last = orders_queue.front(); + orders_queue.pop(); + } + return last; +} + +void push_order(const std::string &msg) { + std::lock_guard lock(queue_mtx); + orders_queue.push(msg); +} + +/* Etats machine */ +enum class MachineState { STOPPED, RUNNING, ESTOP }; +std::atomic machine_state(MachineState::STOPPED); +std::atomic estop_flag(false); +std::atomic running(true); + +/* Fonctions d'application */ +void apply_start() { + if(machine_state!=MachineState::RUNNING){ + std::cout << "[MACHINE] -> START\n"; + machine_state = MachineState::RUNNING; + } +} + +void apply_stop() { + if(machine_state!=MachineState::STOPPED){ + std::cout << "[MACHINE] -> STOP\n"; + machine_state = MachineState::STOPPED; + } +} + +void apply_estop() { + if(machine_state!=MachineState::ESTOP){ + std::cout << "[MACHINE] -> E-STOP\n"; + machine_state = MachineState::ESTOP; + } +} + +/* Thread machine */ +void machine_thread_fn() { + while(running) { + if(estop_flag) { + apply_estop(); + std::this_thread::sleep_for(std::chrono::milliseconds(CYCLE_MS)); + continue; + } + std::string last = pop_all_and_get_last(); + if(!last.empty()) { + if(last=="START") apply_start(); + else if(last=="STOP") apply_stop(); + else std::cout << "[MACHINE] Commande inconnue: '" << last << "'\n"; + } + std::this_thread::sleep_for(std::chrono::milliseconds(CYCLE_MS)); + } + apply_stop(); +} + +/* Callback MQTT */ +class callback : public virtual mqtt::callback { +public: + void message_arrived(mqtt::const_message_ptr msg) override { + std::string payload = msg->to_string(); + if(payload == "E_STOP") { + estop_flag = true; + std::cout << "[MQTT] E-STOP reçu\n"; + } else { + push_order(payload); + std::cout << "[MQTT] Reçu: '" << payload << "'\n"; + } + } +}; + +/* SIGINT handler */ +void sigint_handler(int) { + std::cout << "[MAIN] SIGINT reçu\n"; + running = false; +} + +int main() { + std::signal(SIGINT, sigint_handler); + + /* MQTT async client */ + mqtt::async_client client(ADDRESS, CLIENTID); + callback cb; + client.set_callback(cb); + + mqtt::connect_options connOpts; + connOpts.set_clean_session(true); + connOpts.set_user_name("admin"); + connOpts.set_password("ChangeMe"); + try { + client.connect(connOpts)->wait(); + client.start_consuming(); + client.subscribe(TOPIC, QOS)->wait(); + } catch (const mqtt::exception &exc) { + std::cerr << "Erreur MQTT: " << exc.what() << "\n"; + return 1; + } + + /* Threads */ + std::thread th_machine(machine_thread_fn); + + /* Boucle principale pour la réception */ + while(running) { + + std::string payload = R"({ + "order": "STATUS", + "speed": 120, + "temperature": 36.1 + })"; + + auto msg = mqtt::make_message("geii/telemetry", payload); + msg->set_qos(1); + client.publish(msg); + + std::this_thread::sleep_for(100ms); + } + + /* Arrêt */ + try { + client.unsubscribe(TOPIC)->wait(); + client.stop_consuming(); + client.disconnect()->wait(); + } catch(const mqtt::exception &exc){ + std::cerr << "Erreur déconnexion MQTT: " << exc.what() << "\n"; + } + + th_machine.join(); + + std::cout << "[MAIN] Terminé\n"; + return 0; +} diff --git a/main.cpp b/main.cpp index 4531d68..4fe60a1 100644 --- a/main.cpp +++ b/main.cpp @@ -109,14 +109,26 @@ int main() Actions(); ProcessPrometheus(); - + ProcessMQTT(&client); ProcessException(); usleep(100000); } endwin(); // Termine ncurses et rétablit le terminal - puts("Fin du programme"); + + /* Arrêt */ + try { + client.unsubscribe(TOPIC)->wait(); + client.stop_consuming(); + client.disconnect()->wait(); + } catch(const mqtt::exception &exc){ + std::cerr << "Erreur déconnexion MQTT: " << exc.what() << "\n"; + } + + //th_machine.join(); + + std::cout << "[MAIN] Terminé\n"; return 0; } @@ -859,6 +871,19 @@ void InitPrometheus() tank_histogram = &hist_volume.Add({}, buckets); } +void ProcessMQTT(mqtt::async_client* client) +{ + std::string payload = R"({ + "order": "STATUS", + "speed": 120, + "temperature": 36.1 + })"; + + auto msg = mqtt::make_message("geii/telemetry", payload); + msg->set_qos(1); + client->publish(msg); +} + void ProcessPrometheus() { tank_gauge->Set(_digital[IN_TANK_LEVEL].dvalue); diff --git a/main.hpp b/main.hpp index 73a7cdc..4f0ff79 100644 --- a/main.hpp +++ b/main.hpp @@ -1,3 +1,6 @@ +#undef timeout +#include "mqtt/async_client.h" + void ConsoleInit(); void LireClavier(int ch); @@ -16,6 +19,8 @@ void Process(); void InitPrometheus(); void ProcessPrometheus(); +void ProcessMQTT(mqtt::async_client* client); + double SimulConsoSinusoidale(long t); double SimulConsoBrown(double valeur_precedente);