JSON
This commit is contained in:
315
thread.c
Normal file
315
thread.c
Normal file
@@ -0,0 +1,315 @@
|
||||
/*
|
||||
* amqp_machine.c
|
||||
*
|
||||
* Exemple multithread AMQP (rabbitmq-c) + boucle machine déterministe.
|
||||
*
|
||||
* Compilation :
|
||||
* gcc amqp_machine.c -lrabbitmq -lpthread -o amqp_machine
|
||||
*
|
||||
* Pré-requis :
|
||||
* librabbitmq-dev (rabbitmq-c)
|
||||
*
|
||||
* Test de publication :
|
||||
* rabbitmqadmin publish exchange=amq.default routing_key=geii_orders payload="START"
|
||||
* rabbitmqadmin publish exchange=amq.default routing_key=geii_orders payload="STOP"
|
||||
* rabbitmqadmin publish exchange=amq.default routing_key=geii_orders payload="E_STOP"
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <signal.h>
|
||||
#include <pthread.h>
|
||||
#include <stdatomic.h>
|
||||
|
||||
#include <amqp.h>
|
||||
#include <amqp_tcp_socket.h>
|
||||
|
||||
// Connexion AMQP
|
||||
#define HOST "localhost"
|
||||
#define PORT 5672
|
||||
#define USER "guest"
|
||||
#define PASS "guest"
|
||||
#define VHOST "/"
|
||||
#define QUEUE "geii_orders"
|
||||
|
||||
// Boucle machine
|
||||
#define CYCLE_MS 100
|
||||
|
||||
// Structures de la queue interne
|
||||
typedef struct node {
|
||||
char *msg;
|
||||
struct node *next;
|
||||
} node_t;
|
||||
|
||||
typedef struct {
|
||||
node_t *head;
|
||||
node_t *tail;
|
||||
pthread_mutex_t mtx;
|
||||
} queue_t;
|
||||
|
||||
static void queue_init(queue_t *q) {
|
||||
q->head = q->tail = NULL;
|
||||
pthread_mutex_init(&q->mtx, NULL);
|
||||
}
|
||||
|
||||
static void queue_push(queue_t *q, const char *s) {
|
||||
node_t *n = malloc(sizeof(node_t));
|
||||
n->msg = strdup(s);
|
||||
n->next = NULL;
|
||||
pthread_mutex_lock(&q->mtx);
|
||||
if (q->tail) q->tail->next = n;
|
||||
q->tail = n;
|
||||
if (!q->head) q->head = n;
|
||||
pthread_mutex_unlock(&q->mtx);
|
||||
}
|
||||
|
||||
static char *queue_pop_all_and_get_last(queue_t *q) {
|
||||
// Vide la queue et retourne la dernière chaine (caller doit free()).
|
||||
// Retourne NULL si queue vide.
|
||||
pthread_mutex_lock(&q->mtx);
|
||||
if (!q->head) {
|
||||
pthread_mutex_unlock(&q->mtx);
|
||||
return NULL;
|
||||
}
|
||||
char *last = NULL;
|
||||
node_t *cur = q->head;
|
||||
while (cur) {
|
||||
if (last) free(last);
|
||||
last = strdup(cur->msg);
|
||||
node_t *tmp = cur;
|
||||
cur = cur->next;
|
||||
free(tmp->msg);
|
||||
free(tmp);
|
||||
}
|
||||
q->head = q->tail = NULL;
|
||||
pthread_mutex_unlock(&q->mtx);
|
||||
return last;
|
||||
}
|
||||
|
||||
static void queue_destroy(queue_t *q) {
|
||||
pthread_mutex_lock(&q->mtx);
|
||||
node_t *cur = q->head;
|
||||
while (cur) {
|
||||
node_t *tmp = cur;
|
||||
cur = cur->next;
|
||||
free(tmp->msg);
|
||||
free(tmp);
|
||||
}
|
||||
q->head = q->tail = NULL;
|
||||
pthread_mutex_unlock(&q->mtx);
|
||||
pthread_mutex_destroy(&q->mtx);
|
||||
}
|
||||
|
||||
// Etats machine
|
||||
typedef enum {
|
||||
STATE_STOPPED,
|
||||
STATE_RUNNING,
|
||||
STATE_ESTOP
|
||||
} machine_state_t;
|
||||
|
||||
// Variables globales de contrôle
|
||||
static atomic_int running = 1; // 0 = arrêt demandé
|
||||
static atomic_int estop_flag = 0; // 1 = E-STOP actif
|
||||
static queue_t orders_queue;
|
||||
static machine_state_t machine_state = STATE_STOPPED;
|
||||
|
||||
// Fonctions "appliquer" (à remplacer par actions réelles)
|
||||
static void apply_start(void) {
|
||||
if (machine_state != STATE_RUNNING) {
|
||||
printf("[MACHINE] -> START\n");
|
||||
// TODO: démarrer moteurs / sorties
|
||||
machine_state = STATE_RUNNING;
|
||||
}
|
||||
}
|
||||
|
||||
static void apply_stop(void) {
|
||||
if (machine_state != STATE_STOPPED) {
|
||||
printf("[MACHINE] -> STOP\n");
|
||||
// TODO: couper moteurs / sorties
|
||||
machine_state = STATE_STOPPED;
|
||||
}
|
||||
}
|
||||
|
||||
static void apply_estop(void) {
|
||||
if (machine_state != STATE_ESTOP) {
|
||||
printf("[MACHINE] -> E-STOP (arrêt d'urgence)\n");
|
||||
// TODO: couper puissance, sécurité
|
||||
machine_state = STATE_ESTOP;
|
||||
}
|
||||
}
|
||||
|
||||
// Thread machine : boucle déterministe
|
||||
void *machine_thread_fn(void *arg) {
|
||||
(void)arg;
|
||||
const int cycle_us = CYCLE_MS * 1000;
|
||||
while (atomic_load(&running)) {
|
||||
// 1) Vérifier estop immédiat
|
||||
if (atomic_load(&estop_flag)) {
|
||||
apply_estop();
|
||||
// On peut décider ici de vider la queue ou de la garder
|
||||
// mais on ignore les autres commandes jusqu'à reset.
|
||||
usleep(cycle_us);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 2) Dépiler toute la file et ne garder que la dernière commande
|
||||
char *last = queue_pop_all_and_get_last(&orders_queue);
|
||||
if (last) {
|
||||
// Normaliser message (trim)
|
||||
if (strcmp(last, "START") == 0) {
|
||||
apply_start();
|
||||
} else if (strcmp(last, "STOP") == 0) {
|
||||
apply_stop();
|
||||
} else {
|
||||
printf("[MACHINE] Commande inconnue reçue: '%s'\n", last);
|
||||
}
|
||||
free(last);
|
||||
}
|
||||
|
||||
// 3) Exécuter la logique machine déterministe (boucle cycle)
|
||||
// TODO: lire capteurs, asservissements, sorties...
|
||||
|
||||
usleep(cycle_us);
|
||||
}
|
||||
|
||||
// Fin : safe stop
|
||||
apply_stop();
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// Thread AMQP : consomme messages et les place dans la queue
|
||||
// Si message == "E_STOP" => set estop_flag immédiatement (ne pas mettre en queue)
|
||||
|
||||
void *amqp_thread_fn(void *arg) {
|
||||
(void)arg;
|
||||
amqp_connection_state_t conn = amqp_new_connection();
|
||||
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
|
||||
if (!socket) {
|
||||
fprintf(stderr, "[AMQP] Erreur: création socket\n");
|
||||
atomic_store(&running, 0);
|
||||
return NULL;
|
||||
}
|
||||
if (amqp_socket_open(socket, HOST, PORT)) {
|
||||
fprintf(stderr, "[AMQP] Erreur: ouverture connexion %s:%d\n", HOST, PORT);
|
||||
atomic_store(&running, 0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
amqp_rpc_reply_t rpc_reply = amqp_login(conn, VHOST, 0, 131072, 60,
|
||||
AMQP_SASL_METHOD_PLAIN, USER, PASS);
|
||||
if (rpc_reply.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
fprintf(stderr, "[AMQP] Erreur login\n");
|
||||
atomic_store(&running, 0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
amqp_channel_open(conn, 1);
|
||||
amqp_get_rpc_reply(conn);
|
||||
|
||||
// Déclarer la queue (idempotent)
|
||||
amqp_queue_declare(conn, 1, amqp_cstring_bytes(QUEUE),
|
||||
0, 0, 0, 1, amqp_empty_table());
|
||||
amqp_get_rpc_reply(conn);
|
||||
|
||||
// Démarrer la consommation
|
||||
amqp_basic_consume(conn, 1,
|
||||
amqp_cstring_bytes(QUEUE),
|
||||
amqp_empty_bytes, // consumer tag auto
|
||||
0, // no_local
|
||||
1, // no_ack = 1 (auto-ack) ; ajuster selon besoin
|
||||
0, // exclusive
|
||||
amqp_empty_table());
|
||||
amqp_get_rpc_reply(conn);
|
||||
|
||||
printf("[AMQP] En attente de messages sur '%s'...\n", QUEUE);
|
||||
|
||||
while (atomic_load(&running)) {
|
||||
amqp_envelope_t envelope;
|
||||
amqp_maybe_release_buffers(conn);
|
||||
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = 0;
|
||||
timeout.tv_usec = 500000; // 500 ms
|
||||
|
||||
rpc_reply = amqp_consume_message(conn, &envelope, &timeout, 0);
|
||||
|
||||
if (rpc_reply.reply_type == AMQP_RESPONSE_NORMAL) {
|
||||
// Message reçu
|
||||
size_t len = envelope.message.body.len;
|
||||
char *body = malloc(len + 1);
|
||||
memcpy(body, envelope.message.body.bytes, len);
|
||||
body[len] = '\0';
|
||||
|
||||
printf("[AMQP] Reçu: '%s'\n", body);
|
||||
|
||||
if (strcmp(body, "E_STOP") == 0) {
|
||||
// Priorité absolue : traiter immédiatement
|
||||
atomic_store(&estop_flag, 1);
|
||||
printf("[AMQP] E-STOP reçu : flag estop activé\n");
|
||||
free(body);
|
||||
} else {
|
||||
// Push dans la queue pour traitement au prochain cycle
|
||||
queue_push(&orders_queue, body);
|
||||
free(body);
|
||||
}
|
||||
|
||||
amqp_destroy_envelope(&envelope);
|
||||
} else if (rpc_reply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION &&
|
||||
rpc_reply.library_error == AMQP_STATUS_TIMEOUT) {
|
||||
// timeout - pas de message
|
||||
// rien
|
||||
} else {
|
||||
// autre erreur - tentatives de reconnexion possibles
|
||||
fprintf(stderr, "[AMQP] Erreur consume (reconnexion nécessaire?)\n");
|
||||
sleep(1);
|
||||
// Ici on pourrait tenter de reconnecter proprement ; pour cet exemple,
|
||||
// on continue la boucle et laisse l'admin redémarrer si nécessaire
|
||||
}
|
||||
}
|
||||
|
||||
// Fermeture
|
||||
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
|
||||
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
|
||||
amqp_destroy_connection(conn);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// Handler SIGINT pour arrêt propre
|
||||
static void sigint_handler(int signum) {
|
||||
(void)signum;
|
||||
printf("[MAIN] SIGINT reçu : arrêt\n");
|
||||
atomic_store(&running, 0);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
(void)argc; (void)argv;
|
||||
signal(SIGINT, sigint_handler);
|
||||
|
||||
queue_init(&orders_queue);
|
||||
|
||||
pthread_t th_amqp, th_machine;
|
||||
if (pthread_create(&th_amqp, NULL, amqp_thread_fn, NULL) != 0) {
|
||||
perror("pthread_create amqp");
|
||||
return 1;
|
||||
}
|
||||
if (pthread_create(&th_machine, NULL, machine_thread_fn, NULL) != 0) {
|
||||
perror("pthread_create machine");
|
||||
atomic_store(&running, 0);
|
||||
pthread_join(th_amqp, NULL);
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Attendre threads
|
||||
pthread_join(th_amqp, NULL);
|
||||
pthread_join(th_machine, NULL);
|
||||
|
||||
queue_destroy(&orders_queue);
|
||||
|
||||
printf("[MAIN] Terminé.\n");
|
||||
return 0;
|
||||
}
|
||||
*/
|
||||
Reference in New Issue
Block a user