/* * amqp_machine.c * * Exemple multithread AMQP (rabbitmq-c) + boucle machine déterministe. * * Compilation : * gcc amqp_machine.c -lrabbitmq -lpthread -o amqp_machine * * Pré-requis : * librabbitmq-dev (rabbitmq-c) * * Test de publication : * rabbitmqadmin publish exchange=amq.default routing_key=geii_orders payload="START" * rabbitmqadmin publish exchange=amq.default routing_key=geii_orders payload="STOP" * rabbitmqadmin publish exchange=amq.default routing_key=geii_orders payload="E_STOP" * */ /* #include #include #include #include #include #include #include #include #include // Connexion AMQP #define HOST "localhost" #define PORT 5672 #define USER "guest" #define PASS "guest" #define VHOST "/" #define QUEUE "geii_orders" // Boucle machine #define CYCLE_MS 100 // Structures de la queue interne typedef struct node { char *msg; struct node *next; } node_t; typedef struct { node_t *head; node_t *tail; pthread_mutex_t mtx; } queue_t; static void queue_init(queue_t *q) { q->head = q->tail = NULL; pthread_mutex_init(&q->mtx, NULL); } static void queue_push(queue_t *q, const char *s) { node_t *n = malloc(sizeof(node_t)); n->msg = strdup(s); n->next = NULL; pthread_mutex_lock(&q->mtx); if (q->tail) q->tail->next = n; q->tail = n; if (!q->head) q->head = n; pthread_mutex_unlock(&q->mtx); } static char *queue_pop_all_and_get_last(queue_t *q) { // Vide la queue et retourne la dernière chaine (caller doit free()). // Retourne NULL si queue vide. pthread_mutex_lock(&q->mtx); if (!q->head) { pthread_mutex_unlock(&q->mtx); return NULL; } char *last = NULL; node_t *cur = q->head; while (cur) { if (last) free(last); last = strdup(cur->msg); node_t *tmp = cur; cur = cur->next; free(tmp->msg); free(tmp); } q->head = q->tail = NULL; pthread_mutex_unlock(&q->mtx); return last; } static void queue_destroy(queue_t *q) { pthread_mutex_lock(&q->mtx); node_t *cur = q->head; while (cur) { node_t *tmp = cur; cur = cur->next; free(tmp->msg); free(tmp); } q->head = q->tail = NULL; pthread_mutex_unlock(&q->mtx); pthread_mutex_destroy(&q->mtx); } // Etats machine typedef enum { STATE_STOPPED, STATE_RUNNING, STATE_ESTOP } machine_state_t; // Variables globales de contrôle static atomic_int running = 1; // 0 = arrêt demandé static atomic_int estop_flag = 0; // 1 = E-STOP actif static queue_t orders_queue; static machine_state_t machine_state = STATE_STOPPED; // Fonctions "appliquer" (à remplacer par actions réelles) static void apply_start(void) { if (machine_state != STATE_RUNNING) { printf("[MACHINE] -> START\n"); // TODO: démarrer moteurs / sorties machine_state = STATE_RUNNING; } } static void apply_stop(void) { if (machine_state != STATE_STOPPED) { printf("[MACHINE] -> STOP\n"); // TODO: couper moteurs / sorties machine_state = STATE_STOPPED; } } static void apply_estop(void) { if (machine_state != STATE_ESTOP) { printf("[MACHINE] -> E-STOP (arrêt d'urgence)\n"); // TODO: couper puissance, sécurité machine_state = STATE_ESTOP; } } // Thread machine : boucle déterministe void *machine_thread_fn(void *arg) { (void)arg; const int cycle_us = CYCLE_MS * 1000; while (atomic_load(&running)) { // 1) Vérifier estop immédiat if (atomic_load(&estop_flag)) { apply_estop(); // On peut décider ici de vider la queue ou de la garder // mais on ignore les autres commandes jusqu'à reset. usleep(cycle_us); continue; } // 2) Dépiler toute la file et ne garder que la dernière commande char *last = queue_pop_all_and_get_last(&orders_queue); if (last) { // Normaliser message (trim) if (strcmp(last, "START") == 0) { apply_start(); } else if (strcmp(last, "STOP") == 0) { apply_stop(); } else { printf("[MACHINE] Commande inconnue reçue: '%s'\n", last); } free(last); } // 3) Exécuter la logique machine déterministe (boucle cycle) // TODO: lire capteurs, asservissements, sorties... usleep(cycle_us); } // Fin : safe stop apply_stop(); return NULL; } // Thread AMQP : consomme messages et les place dans la queue // Si message == "E_STOP" => set estop_flag immédiatement (ne pas mettre en queue) void *amqp_thread_fn(void *arg) { (void)arg; amqp_connection_state_t conn = amqp_new_connection(); amqp_socket_t *socket = amqp_tcp_socket_new(conn); if (!socket) { fprintf(stderr, "[AMQP] Erreur: création socket\n"); atomic_store(&running, 0); return NULL; } if (amqp_socket_open(socket, HOST, PORT)) { fprintf(stderr, "[AMQP] Erreur: ouverture connexion %s:%d\n", HOST, PORT); atomic_store(&running, 0); return NULL; } amqp_rpc_reply_t rpc_reply = amqp_login(conn, VHOST, 0, 131072, 60, AMQP_SASL_METHOD_PLAIN, USER, PASS); if (rpc_reply.reply_type != AMQP_RESPONSE_NORMAL) { fprintf(stderr, "[AMQP] Erreur login\n"); atomic_store(&running, 0); return NULL; } amqp_channel_open(conn, 1); amqp_get_rpc_reply(conn); // Déclarer la queue (idempotent) amqp_queue_declare(conn, 1, amqp_cstring_bytes(QUEUE), 0, 0, 0, 1, amqp_empty_table()); amqp_get_rpc_reply(conn); // Démarrer la consommation amqp_basic_consume(conn, 1, amqp_cstring_bytes(QUEUE), amqp_empty_bytes, // consumer tag auto 0, // no_local 1, // no_ack = 1 (auto-ack) ; ajuster selon besoin 0, // exclusive amqp_empty_table()); amqp_get_rpc_reply(conn); printf("[AMQP] En attente de messages sur '%s'...\n", QUEUE); while (atomic_load(&running)) { amqp_envelope_t envelope; amqp_maybe_release_buffers(conn); struct timeval timeout; timeout.tv_sec = 0; timeout.tv_usec = 500000; // 500 ms rpc_reply = amqp_consume_message(conn, &envelope, &timeout, 0); if (rpc_reply.reply_type == AMQP_RESPONSE_NORMAL) { // Message reçu size_t len = envelope.message.body.len; char *body = malloc(len + 1); memcpy(body, envelope.message.body.bytes, len); body[len] = '\0'; printf("[AMQP] Reçu: '%s'\n", body); if (strcmp(body, "E_STOP") == 0) { // Priorité absolue : traiter immédiatement atomic_store(&estop_flag, 1); printf("[AMQP] E-STOP reçu : flag estop activé\n"); free(body); } else { // Push dans la queue pour traitement au prochain cycle queue_push(&orders_queue, body); free(body); } amqp_destroy_envelope(&envelope); } else if (rpc_reply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && rpc_reply.library_error == AMQP_STATUS_TIMEOUT) { // timeout - pas de message // rien } else { // autre erreur - tentatives de reconnexion possibles fprintf(stderr, "[AMQP] Erreur consume (reconnexion nécessaire?)\n"); sleep(1); // Ici on pourrait tenter de reconnecter proprement ; pour cet exemple, // on continue la boucle et laisse l'admin redémarrer si nécessaire } } // Fermeture amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); amqp_connection_close(conn, AMQP_REPLY_SUCCESS); amqp_destroy_connection(conn); return NULL; } // Handler SIGINT pour arrêt propre static void sigint_handler(int signum) { (void)signum; printf("[MAIN] SIGINT reçu : arrêt\n"); atomic_store(&running, 0); } int main(int argc, char **argv) { (void)argc; (void)argv; signal(SIGINT, sigint_handler); queue_init(&orders_queue); pthread_t th_amqp, th_machine; if (pthread_create(&th_amqp, NULL, amqp_thread_fn, NULL) != 0) { perror("pthread_create amqp"); return 1; } if (pthread_create(&th_machine, NULL, machine_thread_fn, NULL) != 0) { perror("pthread_create machine"); atomic_store(&running, 0); pthread_join(th_amqp, NULL); return 1; } // Attendre threads pthread_join(th_amqp, NULL); pthread_join(th_machine, NULL); queue_destroy(&orders_queue); printf("[MAIN] Terminé.\n"); return 0; } */