Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3ce3cecc34 | |||
| 70aa2687d0 | |||
| 9d29b94a96 | |||
| 8c8dc55db4 |
@@ -15,13 +15,25 @@ RUN set -eux; \
|
||||
libmicrohttpd-dev \
|
||||
libcurl4-openssl-dev \
|
||||
zlib1g-dev \
|
||||
prometheus-cpp-dev
|
||||
prometheus-cpp-dev \
|
||||
nlohmann-json3-dev
|
||||
|
||||
|
||||
RUN set -eux; \
|
||||
apt-get update; \
|
||||
apt-get install -y \
|
||||
libpaho-mqtt-dev
|
||||
|
||||
RUN set -eux; \
|
||||
git clone https://github.com/eclipse/paho.mqtt.cpp.git; \
|
||||
cd paho.mqtt.cpp; \
|
||||
git submodule init; \
|
||||
git submodule update; \
|
||||
mkdir build && cd build; \
|
||||
cmake -DPAHO_WITH_MQTT_C=ON ..; \
|
||||
cmake --build . --target install; \
|
||||
ldconfig;
|
||||
|
||||
RUN set -eux; \
|
||||
apt-get update; \
|
||||
apt-get install -y \
|
||||
|
||||
@@ -52,9 +52,9 @@ typedef struct PinIO
|
||||
unsigned long time;
|
||||
double duration;
|
||||
unsigned int nb; // compteur d'activation
|
||||
int memory;
|
||||
unsigned char raising;
|
||||
unsigned char falling;
|
||||
int memory; // valeur précédente
|
||||
unsigned char raising; // front montant
|
||||
unsigned char falling; // front descendant
|
||||
} PinIO;
|
||||
|
||||
PinIO _digital[256];
|
||||
@@ -75,8 +75,6 @@ void pinMode(unsigned char p, unsigned char mode)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/* KEYBOARD */
|
||||
|
||||
typedef struct
|
||||
@@ -323,7 +321,7 @@ class MiseAEchelle
|
||||
float minEntree;
|
||||
float minSortie;
|
||||
float maxEntree;
|
||||
float maxSortie;
|
||||
float maxSortie;
|
||||
};
|
||||
|
||||
/* READ */
|
||||
@@ -351,6 +349,8 @@ void digitalWrite(unsigned int p, int value)
|
||||
// En panne !
|
||||
if (!(_digital[p].mode & 0x01))
|
||||
{
|
||||
_digital[p].ivalue = 0;
|
||||
_digital[p].dvalue = 0.0;
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -34,6 +34,7 @@ find_library(NCURSESW_LIB ncursesw REQUIRED)
|
||||
find_library(RABBITMQ_LIB rabbitmq REQUIRED)
|
||||
|
||||
# Paho MQTT C client
|
||||
find_library(PAHO_MQTTPP3_LIB paho-mqttpp3 REQUIRED)
|
||||
find_library(PAHO_MQTT3C_LIB paho-mqtt3c REQUIRED)
|
||||
|
||||
# -------------------------------
|
||||
@@ -53,5 +54,6 @@ target_link_libraries(geii_exporter
|
||||
${Z_LIB}
|
||||
${NCURSESW_LIB}
|
||||
${RABBITMQ_LIB}
|
||||
${PAHO_MQTT3C_LIB}
|
||||
${PAHO_MQTT3C_LIB} # dépendance C
|
||||
${PAHO_MQTTPP3_LIB} # lib C++
|
||||
)
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
#define TOPIC "test/topic"
|
||||
#define PAYLOAD "Hello MQTT"
|
||||
#define QOS 1
|
||||
#define TIMEOUT 10000L
|
||||
#define TIMEOUT_MQTT 10000L
|
||||
|
||||
int main() {
|
||||
MQTTClient client;
|
||||
@@ -33,7 +33,7 @@ int main() {
|
||||
MQTTClient_deliveryToken token;
|
||||
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
|
||||
|
||||
MQTTClient_waitForCompletion(client, token, TIMEOUT);
|
||||
MQTTClient_waitForCompletion(client, token, TIMEOUT_MQTT);
|
||||
printf("Message publié !\n");
|
||||
|
||||
MQTTClient_disconnect(client, 10000);
|
||||
|
||||
163
main-mqtt2.cpp
Normal file
163
main-mqtt2.cpp
Normal file
@@ -0,0 +1,163 @@
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <queue>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <csignal>
|
||||
#include <chrono>
|
||||
#include <cstring>
|
||||
#include "mqtt/async_client.h"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
/* Configuration MQTT */
|
||||
const std::string ADDRESS = "tcp://rabbitmq:1883";
|
||||
const std::string CLIENTID = "CppClientTP";
|
||||
const std::string TOPIC = "geii/ordre/#";
|
||||
const int QOS = 1;
|
||||
const int CYCLE_MS = 100;
|
||||
|
||||
/* Queue thread-safe */
|
||||
std::queue<std::string> orders_queue;
|
||||
std::mutex queue_mtx;
|
||||
|
||||
std::string pop_all_and_get_last() {
|
||||
std::lock_guard<std::mutex> lock(queue_mtx);
|
||||
if (orders_queue.empty()) return "";
|
||||
std::string last;
|
||||
while (!orders_queue.empty()) {
|
||||
last = orders_queue.front();
|
||||
orders_queue.pop();
|
||||
}
|
||||
return last;
|
||||
}
|
||||
|
||||
void push_order(const std::string &msg) {
|
||||
std::lock_guard<std::mutex> lock(queue_mtx);
|
||||
orders_queue.push(msg);
|
||||
}
|
||||
|
||||
/* Etats machine */
|
||||
enum class MachineState { STOPPED, RUNNING, ESTOP };
|
||||
std::atomic<MachineState> machine_state(MachineState::STOPPED);
|
||||
std::atomic<bool> estop_flag(false);
|
||||
std::atomic<bool> running(true);
|
||||
|
||||
/* Fonctions d'application */
|
||||
void apply_start() {
|
||||
if(machine_state!=MachineState::RUNNING){
|
||||
std::cout << "[MACHINE] -> START\n";
|
||||
machine_state = MachineState::RUNNING;
|
||||
}
|
||||
}
|
||||
|
||||
void apply_stop() {
|
||||
if(machine_state!=MachineState::STOPPED){
|
||||
std::cout << "[MACHINE] -> STOP\n";
|
||||
machine_state = MachineState::STOPPED;
|
||||
}
|
||||
}
|
||||
|
||||
void apply_estop() {
|
||||
if(machine_state!=MachineState::ESTOP){
|
||||
std::cout << "[MACHINE] -> E-STOP\n";
|
||||
machine_state = MachineState::ESTOP;
|
||||
}
|
||||
}
|
||||
|
||||
/* Thread machine */
|
||||
void machine_thread_fn() {
|
||||
while(running) {
|
||||
if(estop_flag) {
|
||||
apply_estop();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(CYCLE_MS));
|
||||
continue;
|
||||
}
|
||||
std::string last = pop_all_and_get_last();
|
||||
if(!last.empty()) {
|
||||
if(last=="START") apply_start();
|
||||
else if(last=="STOP") apply_stop();
|
||||
else std::cout << "[MACHINE] Commande inconnue: '" << last << "'\n";
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(CYCLE_MS));
|
||||
}
|
||||
apply_stop();
|
||||
}
|
||||
|
||||
/* Callback MQTT */
|
||||
class callback : public virtual mqtt::callback {
|
||||
public:
|
||||
void message_arrived(mqtt::const_message_ptr msg) override {
|
||||
std::string payload = msg->to_string();
|
||||
if(payload == "E_STOP") {
|
||||
estop_flag = true;
|
||||
std::cout << "[MQTT] E-STOP reçu\n";
|
||||
} else {
|
||||
push_order(payload);
|
||||
std::cout << "[MQTT] Reçu: '" << payload << "'\n";
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/* SIGINT handler */
|
||||
void sigint_handler(int) {
|
||||
std::cout << "[MAIN] SIGINT reçu\n";
|
||||
running = false;
|
||||
}
|
||||
|
||||
int main() {
|
||||
std::signal(SIGINT, sigint_handler);
|
||||
|
||||
/* MQTT async client */
|
||||
mqtt::async_client client(ADDRESS, CLIENTID);
|
||||
callback cb;
|
||||
client.set_callback(cb);
|
||||
|
||||
mqtt::connect_options connOpts;
|
||||
connOpts.set_clean_session(true);
|
||||
connOpts.set_user_name("admin");
|
||||
connOpts.set_password("ChangeMe");
|
||||
try {
|
||||
client.connect(connOpts)->wait();
|
||||
client.start_consuming();
|
||||
client.subscribe(TOPIC, QOS)->wait();
|
||||
} catch (const mqtt::exception &exc) {
|
||||
std::cerr << "Erreur MQTT: " << exc.what() << "\n";
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Threads */
|
||||
std::thread th_machine(machine_thread_fn);
|
||||
|
||||
/* Boucle principale pour la réception */
|
||||
while(running) {
|
||||
|
||||
std::string payload = R"({
|
||||
"order": "STATUS",
|
||||
"speed": 120,
|
||||
"temperature": 36.1
|
||||
})";
|
||||
|
||||
auto msg = mqtt::make_message("geii/telemetry", payload);
|
||||
msg->set_qos(1);
|
||||
client.publish(msg);
|
||||
|
||||
std::this_thread::sleep_for(100ms);
|
||||
}
|
||||
|
||||
/* Arrêt */
|
||||
try {
|
||||
client.unsubscribe(TOPIC)->wait();
|
||||
client.stop_consuming();
|
||||
client.disconnect()->wait();
|
||||
} catch(const mqtt::exception &exc){
|
||||
std::cerr << "Erreur déconnexion MQTT: " << exc.what() << "\n";
|
||||
}
|
||||
|
||||
th_machine.join();
|
||||
|
||||
std::cout << "[MAIN] Terminé\n";
|
||||
return 0;
|
||||
}
|
||||
319
main.cpp
319
main.cpp
@@ -6,13 +6,46 @@
|
||||
#include "main.hpp"
|
||||
#include "AutomForArduino.cpp"
|
||||
|
||||
#include <prometheus/counter.h>
|
||||
#include <prometheus/gauge.h>
|
||||
#include <prometheus/histogram.h>
|
||||
#include <prometheus/registry.h>
|
||||
#include <prometheus/exposer.h>
|
||||
|
||||
#include <curl/curl.h>
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <queue>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <csignal>
|
||||
#include <chrono>
|
||||
#include <cstring>
|
||||
#undef timeout
|
||||
#include "mqtt/async_client.h"
|
||||
|
||||
#include <nlohmann/json.hpp>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
using json = nlohmann::json;
|
||||
|
||||
// Constantes de fonctionnement
|
||||
#define LEVEL_MIN 2
|
||||
#define FLOW_PER_PUMP 150
|
||||
|
||||
/* Configuration MQTT */
|
||||
const std::string ADDRESS = "tcp://rabbitmq:1883";
|
||||
const std::string CLIENTID = "CppClientTP";
|
||||
const std::string TOPIC = "geii/ordre/#";
|
||||
const int QOS = 1;
|
||||
const int CYCLE_MS = 100;
|
||||
|
||||
WINDOW *window;
|
||||
|
||||
int etape = 10; // Étape du grafcet : début Automatique
|
||||
int etape = 0; // Étape du grafcet : début Automatique
|
||||
int bp_mode, bp_mode_fm;
|
||||
unsigned short pompe1, pompe2, pompe3, pompe4; // bouton des pompes 0 (arrêt) / 1 (marche)
|
||||
unsigned short pompe1_old, pompe2_old, pompe3_old, pompe4_old;
|
||||
@@ -20,18 +53,192 @@ unsigned short sensor_max, sensor_high, sensor_low, sensor_min;
|
||||
|
||||
float TankInitalValue = 7;
|
||||
|
||||
TemporisationRetardMontee tempo1(500);
|
||||
TemporisationRetardMontee tempo2(1000);
|
||||
TemporisationRetardMontee tempo3(1500);
|
||||
TemporisationRetardMontee tempo4(2000);
|
||||
TemporisationRetardMontee tempo1(1500);
|
||||
TemporisationRetardMontee tempo2(3000);
|
||||
TemporisationRetardMontee tempo3(4000);
|
||||
TemporisationRetardMontee tempo4(6000);
|
||||
|
||||
// Prometheus
|
||||
// ************************************************************
|
||||
using namespace prometheus;
|
||||
|
||||
std::shared_ptr<Registry> registry;
|
||||
Gauge *debit_entree = nullptr;
|
||||
Gauge *debit_sortie = nullptr;
|
||||
|
||||
Gauge *debit_p1 = nullptr;
|
||||
Gauge *debit_p2 = nullptr;
|
||||
Gauge *debit_p3 = nullptr;
|
||||
Gauge *debit_p4 = nullptr;
|
||||
Gauge *tank_gauge = nullptr;
|
||||
|
||||
Counter *volume_p1 = nullptr;
|
||||
Counter *volume_p2 = nullptr;
|
||||
Counter *volume_p3 = nullptr;
|
||||
Counter *volume_p4 = nullptr;
|
||||
|
||||
Histogram::BucketBoundaries buckets = {
|
||||
2, 5, 6, 7, 8, 9, 9.5
|
||||
};
|
||||
|
||||
Histogram *tank_histogram = nullptr;
|
||||
// ************************************************************
|
||||
|
||||
|
||||
/* Queue thread-safe */
|
||||
std::queue<std::string> orders_queue;
|
||||
std::mutex queue_mtx;
|
||||
|
||||
std::string pop_all_and_get_last() {
|
||||
std::lock_guard<std::mutex> lock(queue_mtx);
|
||||
if (orders_queue.empty()) return "";
|
||||
std::string last;
|
||||
while (!orders_queue.empty()) {
|
||||
last = orders_queue.front();
|
||||
orders_queue.pop();
|
||||
}
|
||||
return last;
|
||||
}
|
||||
|
||||
void push_order(const std::string &msg) {
|
||||
std::lock_guard<std::mutex> lock(queue_mtx);
|
||||
orders_queue.push(msg);
|
||||
}
|
||||
|
||||
/* Etats machine */
|
||||
enum class MachineState { STOPPED, RUNNING, ESTOP };
|
||||
std::atomic<MachineState> machine_state(MachineState::STOPPED);
|
||||
std::atomic<bool> estop_flag(false);
|
||||
std::atomic<bool> running(true);
|
||||
|
||||
/* Fonctions d'application */
|
||||
void apply_start() {
|
||||
if(machine_state!=MachineState::RUNNING){
|
||||
std::cout << "[MACHINE] -> START\n";
|
||||
machine_state = MachineState::RUNNING;
|
||||
}
|
||||
}
|
||||
|
||||
void apply_stop() {
|
||||
if(machine_state!=MachineState::STOPPED){
|
||||
std::cout << "[MACHINE] -> STOP\n";
|
||||
machine_state = MachineState::STOPPED;
|
||||
}
|
||||
}
|
||||
|
||||
void apply_estop() {
|
||||
if(machine_state!=MachineState::ESTOP){
|
||||
std::cout << "[MACHINE] -> E-STOP\n";
|
||||
machine_state = MachineState::ESTOP;
|
||||
}
|
||||
}
|
||||
|
||||
/* Thread machine */
|
||||
void machine_thread_fn() {
|
||||
while(running) {
|
||||
if(estop_flag) {
|
||||
apply_estop();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(CYCLE_MS));
|
||||
continue;
|
||||
}
|
||||
pompe1 = 1;
|
||||
std::string last = pop_all_and_get_last();
|
||||
if(!last.empty()) {
|
||||
if(last=="START") apply_start();
|
||||
if(last=="P1") {
|
||||
pompe1 = 1;
|
||||
} else if(last=="P2") {
|
||||
pompe2 = 1;
|
||||
} else if(last=="P3") {
|
||||
pompe3 = 1;
|
||||
} else if(last=="P4") {
|
||||
pompe4 = 1;
|
||||
}
|
||||
else if(last=="STOP") apply_stop();
|
||||
else std::cout << "[MACHINE] Commande inconnue: '" << last << "'\n";
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(CYCLE_MS));
|
||||
}
|
||||
apply_stop();
|
||||
}
|
||||
|
||||
/* Callback MQTT */
|
||||
class callback : public virtual mqtt::callback {
|
||||
public:
|
||||
void message_arrived(mqtt::const_message_ptr msg) override {
|
||||
std::string payload = msg->to_string();
|
||||
if(payload == "E_STOP") {
|
||||
estop_flag = true;
|
||||
std::cout << "[MQTT] E-STOP reçu\n";
|
||||
} else if(payload == "P1") {
|
||||
pompe2 = 1;
|
||||
} else {
|
||||
push_order(payload);
|
||||
std::cout << "[MQTT] Reçu: '" << payload << "'\n";
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/* SIGINT handler */
|
||||
void sigint_handler(int) {
|
||||
std::cout << "[MAIN] SIGINT reçu\n";
|
||||
running = false;
|
||||
}
|
||||
|
||||
void send_to_influx(double value) {
|
||||
CURL* curl = curl_easy_init();
|
||||
if (!curl) {
|
||||
std::cerr << "Erreur CURL\n";
|
||||
return;
|
||||
}
|
||||
|
||||
// Line protocol
|
||||
std::string line = "machine_cycle,machine=convoyeur1 value=" + std::to_string(value);
|
||||
|
||||
// Endpoint InfluxDB 2.x
|
||||
std::string url =
|
||||
"http://influxdb:8086/api/v2/write?org=geii&bucket=mesures&precision=s";
|
||||
|
||||
struct curl_slist* headers = nullptr;
|
||||
headers = curl_slist_append(headers, "Authorization: Token MON_TOKEN");
|
||||
headers = curl_slist_append(headers, "Content-Type: text/plain");
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
|
||||
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
|
||||
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, line.c_str());
|
||||
|
||||
CURLcode res = curl_easy_perform(curl);
|
||||
if (res != CURLE_OK) {
|
||||
std::cerr << "Erreur CURL: " << curl_easy_strerror(res) << "\n";
|
||||
}
|
||||
|
||||
curl_slist_free_all(headers);
|
||||
curl_easy_cleanup(curl);
|
||||
}
|
||||
|
||||
|
||||
int main()
|
||||
{
|
||||
std::signal(SIGINT, sigint_handler);
|
||||
|
||||
/* MQTT async client */
|
||||
mqtt::async_client client(ADDRESS, CLIENTID);
|
||||
callback cb;
|
||||
client.set_callback(cb);
|
||||
|
||||
mqtt::connect_options connOpts;
|
||||
connOpts.set_clean_session(true);
|
||||
connOpts.set_user_name("admin");
|
||||
connOpts.set_password("ChangeMe");
|
||||
try {
|
||||
client.connect(connOpts)->wait();
|
||||
client.start_consuming();
|
||||
client.subscribe(TOPIC, QOS)->wait();
|
||||
} catch (const mqtt::exception &exc) {
|
||||
std::cerr << "Erreur MQTT: " << exc.what() << "\n";
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Initialisation */
|
||||
ConsoleInit();
|
||||
AffichageWindow();
|
||||
@@ -65,14 +272,26 @@ int main()
|
||||
Actions();
|
||||
|
||||
ProcessPrometheus();
|
||||
|
||||
ProcessMQTT(&client);
|
||||
ProcessException();
|
||||
|
||||
usleep(100000);
|
||||
}
|
||||
|
||||
endwin(); // Termine ncurses et rétablit le terminal
|
||||
puts("Fin du programme");
|
||||
|
||||
/* Arrêt */
|
||||
try {
|
||||
client.unsubscribe(TOPIC)->wait();
|
||||
client.stop_consuming();
|
||||
client.disconnect()->wait();
|
||||
} catch(const mqtt::exception &exc){
|
||||
std::cerr << "Erreur déconnexion MQTT: " << exc.what() << "\n";
|
||||
}
|
||||
|
||||
//th_machine.join();
|
||||
|
||||
std::cout << "[MAIN] Terminé\n";
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -437,10 +656,11 @@ double ProcessMoteur(int i)
|
||||
|
||||
void ProcessException()
|
||||
{
|
||||
if (t_elapsed > 30) {
|
||||
if (t_elapsed > 60) {
|
||||
_digital[OUT_PUMP_1].mode = 0;
|
||||
digitalWrite(OUT_PUMP_1, 0);
|
||||
} else if (t_elapsed > 15) {
|
||||
_digital[IN_SENSOR_LOW].mode = 0;
|
||||
//_digital[IN_SENSOR_LOW].mode = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -454,8 +674,8 @@ void Process()
|
||||
// ***** FLOW OUT
|
||||
if (_digital[IN_TANK_LEVEL].dvalue > 1.0)
|
||||
{
|
||||
//_digital[IN_FLOW_OUT].dvalue = SimulConsoSinusoidale(t);
|
||||
_digital[IN_FLOW_OUT].dvalue = SimulConsoBrown(_digital[IN_FLOW_OUT].dvalue);
|
||||
_digital[IN_FLOW_OUT].dvalue = SimulConsoSinusoidale(t);
|
||||
//_digital[IN_FLOW_OUT].dvalue = SimulConsoBrown(_digital[IN_FLOW_OUT].dvalue);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -502,8 +722,7 @@ void Process()
|
||||
unsigned char p = i + (OUT_PUMP_1 - IN_KEYBOARD_7);
|
||||
_digital[p].mode ^= 0x01;
|
||||
if (!(_digital[p].mode & 0x01)) {
|
||||
_digital[p].ivalue = 0;
|
||||
_digital[p].dvalue = 0.0;
|
||||
digitalWrite(p, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -770,11 +989,85 @@ void AffichageGraphe(int y, int x, double value)
|
||||
*/
|
||||
void InitPrometheus()
|
||||
{
|
||||
static Exposer exposer{"0.0.0.0:8099"};
|
||||
|
||||
// Le registre central
|
||||
registry = std::make_shared<Registry>();
|
||||
|
||||
exposer.RegisterCollectable(registry);
|
||||
|
||||
auto& gauge_volume = BuildGauge()
|
||||
.Name("geii_volume")
|
||||
.Help("Volume en m3")
|
||||
.Register(*registry);
|
||||
|
||||
tank_gauge = &gauge_volume.Add({});
|
||||
|
||||
auto& gauge_debit = BuildGauge()
|
||||
.Name("geii_debit")
|
||||
.Help("Débit en l/s")
|
||||
.Register(*registry);
|
||||
|
||||
debit_entree = &gauge_debit.Add({{"numero", "entree"}});
|
||||
debit_sortie = &gauge_debit.Add({{"numero", "sortie"}});
|
||||
|
||||
debit_p1 = &gauge_debit.Add({{"numero", "1"}});
|
||||
debit_p2 = &gauge_debit.Add({{"numero", "2"}});
|
||||
debit_p3 = &gauge_debit.Add({{"numero", "3"}});
|
||||
debit_p4 = &gauge_debit.Add({{"numero", "4"}});
|
||||
|
||||
auto& counter_debit = BuildCounter()
|
||||
.Name("geii_litre")
|
||||
.Help("Volume en l")
|
||||
.Register(*registry);
|
||||
|
||||
volume_p1 = &counter_debit.Add({{"numero", "1"}});
|
||||
volume_p2 = &counter_debit.Add({{"numero", "2"}});
|
||||
volume_p3 = &counter_debit.Add({{"numero", "3"}});
|
||||
volume_p4 = &counter_debit.Add({{"numero", "4"}});
|
||||
|
||||
auto& hist_volume = BuildHistogram()
|
||||
.Name("geii_tank")
|
||||
.Help("volume du reservoir en m3")
|
||||
.Register(*registry);
|
||||
|
||||
tank_histogram = &hist_volume.Add({}, buckets);
|
||||
}
|
||||
|
||||
void ProcessMQTT(mqtt::async_client* client)
|
||||
{
|
||||
json obj = {
|
||||
{"entree", _digital[IN_FLOW_IN].dvalue},
|
||||
{"sortie", _digital[IN_FLOW_OUT].dvalue},
|
||||
{"p1", _digital[IN_FLOW_1].dvalue},
|
||||
{"p2", _digital[IN_FLOW_2].dvalue},
|
||||
{"p3", _digital[IN_FLOW_3].dvalue},
|
||||
{"p4", _digital[IN_FLOW_4].dvalue},
|
||||
{"level", _digital[IN_TANK_LEVEL].dvalue}
|
||||
};
|
||||
|
||||
std::string payload = obj.dump();
|
||||
|
||||
auto msg = mqtt::make_message("geii/telemetry", payload);
|
||||
msg->set_qos(1);
|
||||
client->publish(msg);
|
||||
}
|
||||
|
||||
void ProcessPrometheus()
|
||||
{
|
||||
tank_gauge->Set(_digital[IN_TANK_LEVEL].dvalue);
|
||||
tank_histogram->Observe(_digital[IN_TANK_LEVEL].dvalue);
|
||||
|
||||
debit_entree->Set(_digital[IN_FLOW_IN].dvalue);
|
||||
debit_sortie->Set(_digital[IN_FLOW_OUT].dvalue);
|
||||
|
||||
debit_p1->Set(_digital[IN_FLOW_1].dvalue);
|
||||
debit_p2->Set(_digital[IN_FLOW_2].dvalue);
|
||||
debit_p3->Set(_digital[IN_FLOW_3].dvalue);
|
||||
debit_p4->Set(_digital[IN_FLOW_4].dvalue);
|
||||
|
||||
volume_p1->Increment(_digital[IN_FLOW_1].dvalue * dt);
|
||||
volume_p2->Increment(_digital[IN_FLOW_2].dvalue * dt);
|
||||
volume_p3->Increment(_digital[IN_FLOW_3].dvalue * dt);
|
||||
volume_p4->Increment(_digital[IN_FLOW_4].dvalue * dt);
|
||||
}
|
||||
|
||||
5
main.hpp
5
main.hpp
@@ -1,3 +1,6 @@
|
||||
#undef timeout
|
||||
#include "mqtt/async_client.h"
|
||||
|
||||
void ConsoleInit();
|
||||
|
||||
void LireClavier(int ch);
|
||||
@@ -16,6 +19,8 @@ void Process();
|
||||
void InitPrometheus();
|
||||
void ProcessPrometheus();
|
||||
|
||||
void ProcessMQTT(mqtt::async_client* client);
|
||||
|
||||
double SimulConsoSinusoidale(long t);
|
||||
double SimulConsoBrown(double valeur_precedente);
|
||||
|
||||
|
||||
315
thread.c
Normal file
315
thread.c
Normal file
@@ -0,0 +1,315 @@
|
||||
/*
|
||||
* amqp_machine.c
|
||||
*
|
||||
* Exemple multithread AMQP (rabbitmq-c) + boucle machine déterministe.
|
||||
*
|
||||
* Compilation :
|
||||
* gcc amqp_machine.c -lrabbitmq -lpthread -o amqp_machine
|
||||
*
|
||||
* Pré-requis :
|
||||
* librabbitmq-dev (rabbitmq-c)
|
||||
*
|
||||
* Test de publication :
|
||||
* rabbitmqadmin publish exchange=amq.default routing_key=geii_orders payload="START"
|
||||
* rabbitmqadmin publish exchange=amq.default routing_key=geii_orders payload="STOP"
|
||||
* rabbitmqadmin publish exchange=amq.default routing_key=geii_orders payload="E_STOP"
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <signal.h>
|
||||
#include <pthread.h>
|
||||
#include <stdatomic.h>
|
||||
|
||||
#include <amqp.h>
|
||||
#include <amqp_tcp_socket.h>
|
||||
|
||||
// Connexion AMQP
|
||||
#define HOST "localhost"
|
||||
#define PORT 5672
|
||||
#define USER "guest"
|
||||
#define PASS "guest"
|
||||
#define VHOST "/"
|
||||
#define QUEUE "geii_orders"
|
||||
|
||||
// Boucle machine
|
||||
#define CYCLE_MS 100
|
||||
|
||||
// Structures de la queue interne
|
||||
typedef struct node {
|
||||
char *msg;
|
||||
struct node *next;
|
||||
} node_t;
|
||||
|
||||
typedef struct {
|
||||
node_t *head;
|
||||
node_t *tail;
|
||||
pthread_mutex_t mtx;
|
||||
} queue_t;
|
||||
|
||||
static void queue_init(queue_t *q) {
|
||||
q->head = q->tail = NULL;
|
||||
pthread_mutex_init(&q->mtx, NULL);
|
||||
}
|
||||
|
||||
static void queue_push(queue_t *q, const char *s) {
|
||||
node_t *n = malloc(sizeof(node_t));
|
||||
n->msg = strdup(s);
|
||||
n->next = NULL;
|
||||
pthread_mutex_lock(&q->mtx);
|
||||
if (q->tail) q->tail->next = n;
|
||||
q->tail = n;
|
||||
if (!q->head) q->head = n;
|
||||
pthread_mutex_unlock(&q->mtx);
|
||||
}
|
||||
|
||||
static char *queue_pop_all_and_get_last(queue_t *q) {
|
||||
// Vide la queue et retourne la dernière chaine (caller doit free()).
|
||||
// Retourne NULL si queue vide.
|
||||
pthread_mutex_lock(&q->mtx);
|
||||
if (!q->head) {
|
||||
pthread_mutex_unlock(&q->mtx);
|
||||
return NULL;
|
||||
}
|
||||
char *last = NULL;
|
||||
node_t *cur = q->head;
|
||||
while (cur) {
|
||||
if (last) free(last);
|
||||
last = strdup(cur->msg);
|
||||
node_t *tmp = cur;
|
||||
cur = cur->next;
|
||||
free(tmp->msg);
|
||||
free(tmp);
|
||||
}
|
||||
q->head = q->tail = NULL;
|
||||
pthread_mutex_unlock(&q->mtx);
|
||||
return last;
|
||||
}
|
||||
|
||||
static void queue_destroy(queue_t *q) {
|
||||
pthread_mutex_lock(&q->mtx);
|
||||
node_t *cur = q->head;
|
||||
while (cur) {
|
||||
node_t *tmp = cur;
|
||||
cur = cur->next;
|
||||
free(tmp->msg);
|
||||
free(tmp);
|
||||
}
|
||||
q->head = q->tail = NULL;
|
||||
pthread_mutex_unlock(&q->mtx);
|
||||
pthread_mutex_destroy(&q->mtx);
|
||||
}
|
||||
|
||||
// Etats machine
|
||||
typedef enum {
|
||||
STATE_STOPPED,
|
||||
STATE_RUNNING,
|
||||
STATE_ESTOP
|
||||
} machine_state_t;
|
||||
|
||||
// Variables globales de contrôle
|
||||
static atomic_int running = 1; // 0 = arrêt demandé
|
||||
static atomic_int estop_flag = 0; // 1 = E-STOP actif
|
||||
static queue_t orders_queue;
|
||||
static machine_state_t machine_state = STATE_STOPPED;
|
||||
|
||||
// Fonctions "appliquer" (à remplacer par actions réelles)
|
||||
static void apply_start(void) {
|
||||
if (machine_state != STATE_RUNNING) {
|
||||
printf("[MACHINE] -> START\n");
|
||||
// TODO: démarrer moteurs / sorties
|
||||
machine_state = STATE_RUNNING;
|
||||
}
|
||||
}
|
||||
|
||||
static void apply_stop(void) {
|
||||
if (machine_state != STATE_STOPPED) {
|
||||
printf("[MACHINE] -> STOP\n");
|
||||
// TODO: couper moteurs / sorties
|
||||
machine_state = STATE_STOPPED;
|
||||
}
|
||||
}
|
||||
|
||||
static void apply_estop(void) {
|
||||
if (machine_state != STATE_ESTOP) {
|
||||
printf("[MACHINE] -> E-STOP (arrêt d'urgence)\n");
|
||||
// TODO: couper puissance, sécurité
|
||||
machine_state = STATE_ESTOP;
|
||||
}
|
||||
}
|
||||
|
||||
// Thread machine : boucle déterministe
|
||||
void *machine_thread_fn(void *arg) {
|
||||
(void)arg;
|
||||
const int cycle_us = CYCLE_MS * 1000;
|
||||
while (atomic_load(&running)) {
|
||||
// 1) Vérifier estop immédiat
|
||||
if (atomic_load(&estop_flag)) {
|
||||
apply_estop();
|
||||
// On peut décider ici de vider la queue ou de la garder
|
||||
// mais on ignore les autres commandes jusqu'à reset.
|
||||
usleep(cycle_us);
|
||||
continue;
|
||||
}
|
||||
|
||||
// 2) Dépiler toute la file et ne garder que la dernière commande
|
||||
char *last = queue_pop_all_and_get_last(&orders_queue);
|
||||
if (last) {
|
||||
// Normaliser message (trim)
|
||||
if (strcmp(last, "START") == 0) {
|
||||
apply_start();
|
||||
} else if (strcmp(last, "STOP") == 0) {
|
||||
apply_stop();
|
||||
} else {
|
||||
printf("[MACHINE] Commande inconnue reçue: '%s'\n", last);
|
||||
}
|
||||
free(last);
|
||||
}
|
||||
|
||||
// 3) Exécuter la logique machine déterministe (boucle cycle)
|
||||
// TODO: lire capteurs, asservissements, sorties...
|
||||
|
||||
usleep(cycle_us);
|
||||
}
|
||||
|
||||
// Fin : safe stop
|
||||
apply_stop();
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// Thread AMQP : consomme messages et les place dans la queue
|
||||
// Si message == "E_STOP" => set estop_flag immédiatement (ne pas mettre en queue)
|
||||
|
||||
void *amqp_thread_fn(void *arg) {
|
||||
(void)arg;
|
||||
amqp_connection_state_t conn = amqp_new_connection();
|
||||
amqp_socket_t *socket = amqp_tcp_socket_new(conn);
|
||||
if (!socket) {
|
||||
fprintf(stderr, "[AMQP] Erreur: création socket\n");
|
||||
atomic_store(&running, 0);
|
||||
return NULL;
|
||||
}
|
||||
if (amqp_socket_open(socket, HOST, PORT)) {
|
||||
fprintf(stderr, "[AMQP] Erreur: ouverture connexion %s:%d\n", HOST, PORT);
|
||||
atomic_store(&running, 0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
amqp_rpc_reply_t rpc_reply = amqp_login(conn, VHOST, 0, 131072, 60,
|
||||
AMQP_SASL_METHOD_PLAIN, USER, PASS);
|
||||
if (rpc_reply.reply_type != AMQP_RESPONSE_NORMAL) {
|
||||
fprintf(stderr, "[AMQP] Erreur login\n");
|
||||
atomic_store(&running, 0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
amqp_channel_open(conn, 1);
|
||||
amqp_get_rpc_reply(conn);
|
||||
|
||||
// Déclarer la queue (idempotent)
|
||||
amqp_queue_declare(conn, 1, amqp_cstring_bytes(QUEUE),
|
||||
0, 0, 0, 1, amqp_empty_table());
|
||||
amqp_get_rpc_reply(conn);
|
||||
|
||||
// Démarrer la consommation
|
||||
amqp_basic_consume(conn, 1,
|
||||
amqp_cstring_bytes(QUEUE),
|
||||
amqp_empty_bytes, // consumer tag auto
|
||||
0, // no_local
|
||||
1, // no_ack = 1 (auto-ack) ; ajuster selon besoin
|
||||
0, // exclusive
|
||||
amqp_empty_table());
|
||||
amqp_get_rpc_reply(conn);
|
||||
|
||||
printf("[AMQP] En attente de messages sur '%s'...\n", QUEUE);
|
||||
|
||||
while (atomic_load(&running)) {
|
||||
amqp_envelope_t envelope;
|
||||
amqp_maybe_release_buffers(conn);
|
||||
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = 0;
|
||||
timeout.tv_usec = 500000; // 500 ms
|
||||
|
||||
rpc_reply = amqp_consume_message(conn, &envelope, &timeout, 0);
|
||||
|
||||
if (rpc_reply.reply_type == AMQP_RESPONSE_NORMAL) {
|
||||
// Message reçu
|
||||
size_t len = envelope.message.body.len;
|
||||
char *body = malloc(len + 1);
|
||||
memcpy(body, envelope.message.body.bytes, len);
|
||||
body[len] = '\0';
|
||||
|
||||
printf("[AMQP] Reçu: '%s'\n", body);
|
||||
|
||||
if (strcmp(body, "E_STOP") == 0) {
|
||||
// Priorité absolue : traiter immédiatement
|
||||
atomic_store(&estop_flag, 1);
|
||||
printf("[AMQP] E-STOP reçu : flag estop activé\n");
|
||||
free(body);
|
||||
} else {
|
||||
// Push dans la queue pour traitement au prochain cycle
|
||||
queue_push(&orders_queue, body);
|
||||
free(body);
|
||||
}
|
||||
|
||||
amqp_destroy_envelope(&envelope);
|
||||
} else if (rpc_reply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION &&
|
||||
rpc_reply.library_error == AMQP_STATUS_TIMEOUT) {
|
||||
// timeout - pas de message
|
||||
// rien
|
||||
} else {
|
||||
// autre erreur - tentatives de reconnexion possibles
|
||||
fprintf(stderr, "[AMQP] Erreur consume (reconnexion nécessaire?)\n");
|
||||
sleep(1);
|
||||
// Ici on pourrait tenter de reconnecter proprement ; pour cet exemple,
|
||||
// on continue la boucle et laisse l'admin redémarrer si nécessaire
|
||||
}
|
||||
}
|
||||
|
||||
// Fermeture
|
||||
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
|
||||
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
|
||||
amqp_destroy_connection(conn);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// Handler SIGINT pour arrêt propre
|
||||
static void sigint_handler(int signum) {
|
||||
(void)signum;
|
||||
printf("[MAIN] SIGINT reçu : arrêt\n");
|
||||
atomic_store(&running, 0);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
(void)argc; (void)argv;
|
||||
signal(SIGINT, sigint_handler);
|
||||
|
||||
queue_init(&orders_queue);
|
||||
|
||||
pthread_t th_amqp, th_machine;
|
||||
if (pthread_create(&th_amqp, NULL, amqp_thread_fn, NULL) != 0) {
|
||||
perror("pthread_create amqp");
|
||||
return 1;
|
||||
}
|
||||
if (pthread_create(&th_machine, NULL, machine_thread_fn, NULL) != 0) {
|
||||
perror("pthread_create machine");
|
||||
atomic_store(&running, 0);
|
||||
pthread_join(th_amqp, NULL);
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Attendre threads
|
||||
pthread_join(th_amqp, NULL);
|
||||
pthread_join(th_machine, NULL);
|
||||
|
||||
queue_destroy(&orders_queue);
|
||||
|
||||
printf("[MAIN] Terminé.\n");
|
||||
return 0;
|
||||
}
|
||||
*/
|
||||
Reference in New Issue
Block a user