From 3ce3cecc343f1092a54314f9835438216e962527 Mon Sep 17 00:00:00 2001 From: medina5 Date: Sun, 7 Dec 2025 16:48:16 +0100 Subject: [PATCH] JSON --- main.cpp | 181 ++++++++++++++++++++++++++++++-- thread.c | 315 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 490 insertions(+), 6 deletions(-) create mode 100644 thread.c diff --git a/main.cpp b/main.cpp index 4fe60a1..a67d5c4 100644 --- a/main.cpp +++ b/main.cpp @@ -27,15 +27,25 @@ #undef timeout #include "mqtt/async_client.h" +#include + using namespace std::chrono_literals; +using json = nlohmann::json; // Constantes de fonctionnement #define LEVEL_MIN 2 #define FLOW_PER_PUMP 150 +/* Configuration MQTT */ +const std::string ADDRESS = "tcp://rabbitmq:1883"; +const std::string CLIENTID = "CppClientTP"; +const std::string TOPIC = "geii/ordre/#"; +const int QOS = 1; +const int CYCLE_MS = 100; + WINDOW *window; -int etape = 10; // Étape du grafcet : début Automatique +int etape = 0; // Étape du grafcet : début Automatique int bp_mode, bp_mode_fm; unsigned short pompe1, pompe2, pompe3, pompe4; // bouton des pompes 0 (arrêt) / 1 (marche) unsigned short pompe1_old, pompe2_old, pompe3_old, pompe4_old; @@ -74,8 +84,161 @@ Histogram::BucketBoundaries buckets = { Histogram *tank_histogram = nullptr; // ************************************************************ + +/* Queue thread-safe */ +std::queue orders_queue; +std::mutex queue_mtx; + +std::string pop_all_and_get_last() { + std::lock_guard lock(queue_mtx); + if (orders_queue.empty()) return ""; + std::string last; + while (!orders_queue.empty()) { + last = orders_queue.front(); + orders_queue.pop(); + } + return last; +} + +void push_order(const std::string &msg) { + std::lock_guard lock(queue_mtx); + orders_queue.push(msg); +} + +/* Etats machine */ +enum class MachineState { STOPPED, RUNNING, ESTOP }; +std::atomic machine_state(MachineState::STOPPED); +std::atomic estop_flag(false); +std::atomic running(true); + +/* Fonctions d'application */ +void apply_start() { + if(machine_state!=MachineState::RUNNING){ + std::cout << "[MACHINE] -> START\n"; + machine_state = MachineState::RUNNING; + } +} + +void apply_stop() { + if(machine_state!=MachineState::STOPPED){ + std::cout << "[MACHINE] -> STOP\n"; + machine_state = MachineState::STOPPED; + } +} + +void apply_estop() { + if(machine_state!=MachineState::ESTOP){ + std::cout << "[MACHINE] -> E-STOP\n"; + machine_state = MachineState::ESTOP; + } +} + +/* Thread machine */ +void machine_thread_fn() { + while(running) { + if(estop_flag) { + apply_estop(); + std::this_thread::sleep_for(std::chrono::milliseconds(CYCLE_MS)); + continue; + } + pompe1 = 1; + std::string last = pop_all_and_get_last(); + if(!last.empty()) { + if(last=="START") apply_start(); + if(last=="P1") { + pompe1 = 1; + } else if(last=="P2") { + pompe2 = 1; + } else if(last=="P3") { + pompe3 = 1; + } else if(last=="P4") { + pompe4 = 1; + } + else if(last=="STOP") apply_stop(); + else std::cout << "[MACHINE] Commande inconnue: '" << last << "'\n"; + } + std::this_thread::sleep_for(std::chrono::milliseconds(CYCLE_MS)); + } + apply_stop(); +} + +/* Callback MQTT */ +class callback : public virtual mqtt::callback { +public: + void message_arrived(mqtt::const_message_ptr msg) override { + std::string payload = msg->to_string(); + if(payload == "E_STOP") { + estop_flag = true; + std::cout << "[MQTT] E-STOP reçu\n"; + } else if(payload == "P1") { + pompe2 = 1; + } else { + push_order(payload); + std::cout << "[MQTT] Reçu: '" << payload << "'\n"; + } + } +}; + +/* SIGINT handler */ +void sigint_handler(int) { + std::cout << "[MAIN] SIGINT reçu\n"; + running = false; +} + +void send_to_influx(double value) { + CURL* curl = curl_easy_init(); + if (!curl) { + std::cerr << "Erreur CURL\n"; + return; + } + + // Line protocol + std::string line = "machine_cycle,machine=convoyeur1 value=" + std::to_string(value); + + // Endpoint InfluxDB 2.x + std::string url = + "http://influxdb:8086/api/v2/write?org=geii&bucket=mesures&precision=s"; + + struct curl_slist* headers = nullptr; + headers = curl_slist_append(headers, "Authorization: Token MON_TOKEN"); + headers = curl_slist_append(headers, "Content-Type: text/plain"); + + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, line.c_str()); + + CURLcode res = curl_easy_perform(curl); + if (res != CURLE_OK) { + std::cerr << "Erreur CURL: " << curl_easy_strerror(res) << "\n"; + } + + curl_slist_free_all(headers); + curl_easy_cleanup(curl); +} + + int main() { + std::signal(SIGINT, sigint_handler); + + /* MQTT async client */ + mqtt::async_client client(ADDRESS, CLIENTID); + callback cb; + client.set_callback(cb); + + mqtt::connect_options connOpts; + connOpts.set_clean_session(true); + connOpts.set_user_name("admin"); + connOpts.set_password("ChangeMe"); + try { + client.connect(connOpts)->wait(); + client.start_consuming(); + client.subscribe(TOPIC, QOS)->wait(); + } catch (const mqtt::exception &exc) { + std::cerr << "Erreur MQTT: " << exc.what() << "\n"; + return 1; + } + /* Initialisation */ ConsoleInit(); AffichageWindow(); @@ -873,11 +1036,17 @@ void InitPrometheus() void ProcessMQTT(mqtt::async_client* client) { - std::string payload = R"({ - "order": "STATUS", - "speed": 120, - "temperature": 36.1 - })"; + json obj = { + {"entree", _digital[IN_FLOW_IN].dvalue}, + {"sortie", _digital[IN_FLOW_OUT].dvalue}, + {"p1", _digital[IN_FLOW_1].dvalue}, + {"p2", _digital[IN_FLOW_2].dvalue}, + {"p3", _digital[IN_FLOW_3].dvalue}, + {"p4", _digital[IN_FLOW_4].dvalue}, + {"level", _digital[IN_TANK_LEVEL].dvalue} + }; + + std::string payload = obj.dump(); auto msg = mqtt::make_message("geii/telemetry", payload); msg->set_qos(1); diff --git a/thread.c b/thread.c new file mode 100644 index 0000000..46d8bbb --- /dev/null +++ b/thread.c @@ -0,0 +1,315 @@ +/* + * amqp_machine.c + * + * Exemple multithread AMQP (rabbitmq-c) + boucle machine déterministe. + * + * Compilation : + * gcc amqp_machine.c -lrabbitmq -lpthread -o amqp_machine + * + * Pré-requis : + * librabbitmq-dev (rabbitmq-c) + * + * Test de publication : + * rabbitmqadmin publish exchange=amq.default routing_key=geii_orders payload="START" + * rabbitmqadmin publish exchange=amq.default routing_key=geii_orders payload="STOP" + * rabbitmqadmin publish exchange=amq.default routing_key=geii_orders payload="E_STOP" + * + */ + + /* +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +// Connexion AMQP +#define HOST "localhost" +#define PORT 5672 +#define USER "guest" +#define PASS "guest" +#define VHOST "/" +#define QUEUE "geii_orders" + +// Boucle machine +#define CYCLE_MS 100 + +// Structures de la queue interne +typedef struct node { + char *msg; + struct node *next; +} node_t; + +typedef struct { + node_t *head; + node_t *tail; + pthread_mutex_t mtx; +} queue_t; + +static void queue_init(queue_t *q) { + q->head = q->tail = NULL; + pthread_mutex_init(&q->mtx, NULL); +} + +static void queue_push(queue_t *q, const char *s) { + node_t *n = malloc(sizeof(node_t)); + n->msg = strdup(s); + n->next = NULL; + pthread_mutex_lock(&q->mtx); + if (q->tail) q->tail->next = n; + q->tail = n; + if (!q->head) q->head = n; + pthread_mutex_unlock(&q->mtx); +} + +static char *queue_pop_all_and_get_last(queue_t *q) { + // Vide la queue et retourne la dernière chaine (caller doit free()). + // Retourne NULL si queue vide. + pthread_mutex_lock(&q->mtx); + if (!q->head) { + pthread_mutex_unlock(&q->mtx); + return NULL; + } + char *last = NULL; + node_t *cur = q->head; + while (cur) { + if (last) free(last); + last = strdup(cur->msg); + node_t *tmp = cur; + cur = cur->next; + free(tmp->msg); + free(tmp); + } + q->head = q->tail = NULL; + pthread_mutex_unlock(&q->mtx); + return last; +} + +static void queue_destroy(queue_t *q) { + pthread_mutex_lock(&q->mtx); + node_t *cur = q->head; + while (cur) { + node_t *tmp = cur; + cur = cur->next; + free(tmp->msg); + free(tmp); + } + q->head = q->tail = NULL; + pthread_mutex_unlock(&q->mtx); + pthread_mutex_destroy(&q->mtx); +} + +// Etats machine +typedef enum { + STATE_STOPPED, + STATE_RUNNING, + STATE_ESTOP +} machine_state_t; + +// Variables globales de contrôle +static atomic_int running = 1; // 0 = arrêt demandé +static atomic_int estop_flag = 0; // 1 = E-STOP actif +static queue_t orders_queue; +static machine_state_t machine_state = STATE_STOPPED; + +// Fonctions "appliquer" (à remplacer par actions réelles) +static void apply_start(void) { + if (machine_state != STATE_RUNNING) { + printf("[MACHINE] -> START\n"); + // TODO: démarrer moteurs / sorties + machine_state = STATE_RUNNING; + } +} + +static void apply_stop(void) { + if (machine_state != STATE_STOPPED) { + printf("[MACHINE] -> STOP\n"); + // TODO: couper moteurs / sorties + machine_state = STATE_STOPPED; + } +} + +static void apply_estop(void) { + if (machine_state != STATE_ESTOP) { + printf("[MACHINE] -> E-STOP (arrêt d'urgence)\n"); + // TODO: couper puissance, sécurité + machine_state = STATE_ESTOP; + } +} + +// Thread machine : boucle déterministe +void *machine_thread_fn(void *arg) { + (void)arg; + const int cycle_us = CYCLE_MS * 1000; + while (atomic_load(&running)) { + // 1) Vérifier estop immédiat + if (atomic_load(&estop_flag)) { + apply_estop(); + // On peut décider ici de vider la queue ou de la garder + // mais on ignore les autres commandes jusqu'à reset. + usleep(cycle_us); + continue; + } + + // 2) Dépiler toute la file et ne garder que la dernière commande + char *last = queue_pop_all_and_get_last(&orders_queue); + if (last) { + // Normaliser message (trim) + if (strcmp(last, "START") == 0) { + apply_start(); + } else if (strcmp(last, "STOP") == 0) { + apply_stop(); + } else { + printf("[MACHINE] Commande inconnue reçue: '%s'\n", last); + } + free(last); + } + + // 3) Exécuter la logique machine déterministe (boucle cycle) + // TODO: lire capteurs, asservissements, sorties... + + usleep(cycle_us); + } + + // Fin : safe stop + apply_stop(); + return NULL; +} + +// Thread AMQP : consomme messages et les place dans la queue +// Si message == "E_STOP" => set estop_flag immédiatement (ne pas mettre en queue) + +void *amqp_thread_fn(void *arg) { + (void)arg; + amqp_connection_state_t conn = amqp_new_connection(); + amqp_socket_t *socket = amqp_tcp_socket_new(conn); + if (!socket) { + fprintf(stderr, "[AMQP] Erreur: création socket\n"); + atomic_store(&running, 0); + return NULL; + } + if (amqp_socket_open(socket, HOST, PORT)) { + fprintf(stderr, "[AMQP] Erreur: ouverture connexion %s:%d\n", HOST, PORT); + atomic_store(&running, 0); + return NULL; + } + + amqp_rpc_reply_t rpc_reply = amqp_login(conn, VHOST, 0, 131072, 60, + AMQP_SASL_METHOD_PLAIN, USER, PASS); + if (rpc_reply.reply_type != AMQP_RESPONSE_NORMAL) { + fprintf(stderr, "[AMQP] Erreur login\n"); + atomic_store(&running, 0); + return NULL; + } + + amqp_channel_open(conn, 1); + amqp_get_rpc_reply(conn); + + // Déclarer la queue (idempotent) + amqp_queue_declare(conn, 1, amqp_cstring_bytes(QUEUE), + 0, 0, 0, 1, amqp_empty_table()); + amqp_get_rpc_reply(conn); + + // Démarrer la consommation + amqp_basic_consume(conn, 1, + amqp_cstring_bytes(QUEUE), + amqp_empty_bytes, // consumer tag auto + 0, // no_local + 1, // no_ack = 1 (auto-ack) ; ajuster selon besoin + 0, // exclusive + amqp_empty_table()); + amqp_get_rpc_reply(conn); + + printf("[AMQP] En attente de messages sur '%s'...\n", QUEUE); + + while (atomic_load(&running)) { + amqp_envelope_t envelope; + amqp_maybe_release_buffers(conn); + + struct timeval timeout; + timeout.tv_sec = 0; + timeout.tv_usec = 500000; // 500 ms + + rpc_reply = amqp_consume_message(conn, &envelope, &timeout, 0); + + if (rpc_reply.reply_type == AMQP_RESPONSE_NORMAL) { + // Message reçu + size_t len = envelope.message.body.len; + char *body = malloc(len + 1); + memcpy(body, envelope.message.body.bytes, len); + body[len] = '\0'; + + printf("[AMQP] Reçu: '%s'\n", body); + + if (strcmp(body, "E_STOP") == 0) { + // Priorité absolue : traiter immédiatement + atomic_store(&estop_flag, 1); + printf("[AMQP] E-STOP reçu : flag estop activé\n"); + free(body); + } else { + // Push dans la queue pour traitement au prochain cycle + queue_push(&orders_queue, body); + free(body); + } + + amqp_destroy_envelope(&envelope); + } else if (rpc_reply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && + rpc_reply.library_error == AMQP_STATUS_TIMEOUT) { + // timeout - pas de message + // rien + } else { + // autre erreur - tentatives de reconnexion possibles + fprintf(stderr, "[AMQP] Erreur consume (reconnexion nécessaire?)\n"); + sleep(1); + // Ici on pourrait tenter de reconnecter proprement ; pour cet exemple, + // on continue la boucle et laisse l'admin redémarrer si nécessaire + } + } + + // Fermeture + amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); + amqp_connection_close(conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(conn); + return NULL; +} + +// Handler SIGINT pour arrêt propre +static void sigint_handler(int signum) { + (void)signum; + printf("[MAIN] SIGINT reçu : arrêt\n"); + atomic_store(&running, 0); +} + +int main(int argc, char **argv) { + (void)argc; (void)argv; + signal(SIGINT, sigint_handler); + + queue_init(&orders_queue); + + pthread_t th_amqp, th_machine; + if (pthread_create(&th_amqp, NULL, amqp_thread_fn, NULL) != 0) { + perror("pthread_create amqp"); + return 1; + } + if (pthread_create(&th_machine, NULL, machine_thread_fn, NULL) != 0) { + perror("pthread_create machine"); + atomic_store(&running, 0); + pthread_join(th_amqp, NULL); + return 1; + } + + // Attendre threads + pthread_join(th_amqp, NULL); + pthread_join(th_machine, NULL); + + queue_destroy(&orders_queue); + + printf("[MAIN] Terminé.\n"); + return 0; +} +*/