ProcessMQTT

This commit is contained in:
2025-12-07 16:04:44 +01:00
parent 9d29b94a96
commit 70aa2687d0
4 changed files with 197 additions and 4 deletions

View File

@@ -8,7 +8,7 @@
#define TOPIC "test/topic" #define TOPIC "test/topic"
#define PAYLOAD "Hello MQTT" #define PAYLOAD "Hello MQTT"
#define QOS 1 #define QOS 1
#define TIMEOUT 10000L #define TIMEOUT_MQTT 10000L
int main() { int main() {
MQTTClient client; MQTTClient client;
@@ -33,7 +33,7 @@ int main() {
MQTTClient_deliveryToken token; MQTTClient_deliveryToken token;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
MQTTClient_waitForCompletion(client, token, TIMEOUT); MQTTClient_waitForCompletion(client, token, TIMEOUT_MQTT);
printf("Message publié !\n"); printf("Message publié !\n");
MQTTClient_disconnect(client, 10000); MQTTClient_disconnect(client, 10000);

163
main-mqtt2.cpp Normal file
View File

@@ -0,0 +1,163 @@
#include <iostream>
#include <string>
#include <thread>
#include <atomic>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <csignal>
#include <chrono>
#include <cstring>
#include "mqtt/async_client.h"
using namespace std::chrono_literals;
/* Configuration MQTT */
const std::string ADDRESS = "tcp://rabbitmq:1883";
const std::string CLIENTID = "CppClientTP";
const std::string TOPIC = "geii/ordre/#";
const int QOS = 1;
const int CYCLE_MS = 100;
/* Queue thread-safe */
std::queue<std::string> orders_queue;
std::mutex queue_mtx;
std::string pop_all_and_get_last() {
std::lock_guard<std::mutex> lock(queue_mtx);
if (orders_queue.empty()) return "";
std::string last;
while (!orders_queue.empty()) {
last = orders_queue.front();
orders_queue.pop();
}
return last;
}
void push_order(const std::string &msg) {
std::lock_guard<std::mutex> lock(queue_mtx);
orders_queue.push(msg);
}
/* Etats machine */
enum class MachineState { STOPPED, RUNNING, ESTOP };
std::atomic<MachineState> machine_state(MachineState::STOPPED);
std::atomic<bool> estop_flag(false);
std::atomic<bool> running(true);
/* Fonctions d'application */
void apply_start() {
if(machine_state!=MachineState::RUNNING){
std::cout << "[MACHINE] -> START\n";
machine_state = MachineState::RUNNING;
}
}
void apply_stop() {
if(machine_state!=MachineState::STOPPED){
std::cout << "[MACHINE] -> STOP\n";
machine_state = MachineState::STOPPED;
}
}
void apply_estop() {
if(machine_state!=MachineState::ESTOP){
std::cout << "[MACHINE] -> E-STOP\n";
machine_state = MachineState::ESTOP;
}
}
/* Thread machine */
void machine_thread_fn() {
while(running) {
if(estop_flag) {
apply_estop();
std::this_thread::sleep_for(std::chrono::milliseconds(CYCLE_MS));
continue;
}
std::string last = pop_all_and_get_last();
if(!last.empty()) {
if(last=="START") apply_start();
else if(last=="STOP") apply_stop();
else std::cout << "[MACHINE] Commande inconnue: '" << last << "'\n";
}
std::this_thread::sleep_for(std::chrono::milliseconds(CYCLE_MS));
}
apply_stop();
}
/* Callback MQTT */
class callback : public virtual mqtt::callback {
public:
void message_arrived(mqtt::const_message_ptr msg) override {
std::string payload = msg->to_string();
if(payload == "E_STOP") {
estop_flag = true;
std::cout << "[MQTT] E-STOP reçu\n";
} else {
push_order(payload);
std::cout << "[MQTT] Reçu: '" << payload << "'\n";
}
}
};
/* SIGINT handler */
void sigint_handler(int) {
std::cout << "[MAIN] SIGINT reçu\n";
running = false;
}
int main() {
std::signal(SIGINT, sigint_handler);
/* MQTT async client */
mqtt::async_client client(ADDRESS, CLIENTID);
callback cb;
client.set_callback(cb);
mqtt::connect_options connOpts;
connOpts.set_clean_session(true);
connOpts.set_user_name("admin");
connOpts.set_password("ChangeMe");
try {
client.connect(connOpts)->wait();
client.start_consuming();
client.subscribe(TOPIC, QOS)->wait();
} catch (const mqtt::exception &exc) {
std::cerr << "Erreur MQTT: " << exc.what() << "\n";
return 1;
}
/* Threads */
std::thread th_machine(machine_thread_fn);
/* Boucle principale pour la réception */
while(running) {
std::string payload = R"({
"order": "STATUS",
"speed": 120,
"temperature": 36.1
})";
auto msg = mqtt::make_message("geii/telemetry", payload);
msg->set_qos(1);
client.publish(msg);
std::this_thread::sleep_for(100ms);
}
/* Arrêt */
try {
client.unsubscribe(TOPIC)->wait();
client.stop_consuming();
client.disconnect()->wait();
} catch(const mqtt::exception &exc){
std::cerr << "Erreur déconnexion MQTT: " << exc.what() << "\n";
}
th_machine.join();
std::cout << "[MAIN] Terminé\n";
return 0;
}

View File

@@ -109,14 +109,26 @@ int main()
Actions(); Actions();
ProcessPrometheus(); ProcessPrometheus();
ProcessMQTT(&client);
ProcessException(); ProcessException();
usleep(100000); usleep(100000);
} }
endwin(); // Termine ncurses et rétablit le terminal endwin(); // Termine ncurses et rétablit le terminal
puts("Fin du programme");
/* Arrêt */
try {
client.unsubscribe(TOPIC)->wait();
client.stop_consuming();
client.disconnect()->wait();
} catch(const mqtt::exception &exc){
std::cerr << "Erreur déconnexion MQTT: " << exc.what() << "\n";
}
//th_machine.join();
std::cout << "[MAIN] Terminé\n";
return 0; return 0;
} }
@@ -859,6 +871,19 @@ void InitPrometheus()
tank_histogram = &hist_volume.Add({}, buckets); tank_histogram = &hist_volume.Add({}, buckets);
} }
void ProcessMQTT(mqtt::async_client* client)
{
std::string payload = R"({
"order": "STATUS",
"speed": 120,
"temperature": 36.1
})";
auto msg = mqtt::make_message("geii/telemetry", payload);
msg->set_qos(1);
client->publish(msg);
}
void ProcessPrometheus() void ProcessPrometheus()
{ {
tank_gauge->Set(_digital[IN_TANK_LEVEL].dvalue); tank_gauge->Set(_digital[IN_TANK_LEVEL].dvalue);

View File

@@ -1,3 +1,6 @@
#undef timeout
#include "mqtt/async_client.h"
void ConsoleInit(); void ConsoleInit();
void LireClavier(int ch); void LireClavier(int ch);
@@ -16,6 +19,8 @@ void Process();
void InitPrometheus(); void InitPrometheus();
void ProcessPrometheus(); void ProcessPrometheus();
void ProcessMQTT(mqtt::async_client* client);
double SimulConsoSinusoidale(long t); double SimulConsoSinusoidale(long t);
double SimulConsoBrown(double valeur_precedente); double SimulConsoBrown(double valeur_precedente);