From f1d9abc4c7ac1f7457196da76361e73daddbb10d Mon Sep 17 00:00:00 2001 From: Tobias Maier Date: Mon, 30 Mar 2026 18:34:14 +0200 Subject: [PATCH] Switched to queues for webserver --- src/main.cpp | 26 +++++++---- src/networking/json_builder.cpp | 1 - src/networking/webserver.cpp | 76 ++++++++++++++++++++++++++----- src/networking/webserver.h | 3 +- src/sensor/sensor.cpp | 14 ++++++ src/tools/readers_writer_lock.cpp | 70 ++++++++++++++++++++++++++++ src/tools/readers_writer_lock.h | 65 ++++++++++++++++++++++++++ 7 files changed, 231 insertions(+), 24 deletions(-) create mode 100644 src/tools/readers_writer_lock.cpp create mode 100644 src/tools/readers_writer_lock.h diff --git a/src/main.cpp b/src/main.cpp index 60a716d..f5ef096 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -31,7 +31,6 @@ Preferences prefs; -extern WaterData water_data; extern DeviceTelemetry telemetry; extern NetworkData wifi_data; extern NetworkData ethernet_data; @@ -101,16 +100,23 @@ void setup() LOG(ELOG_LEVEL_DEBUG, "Starting main tasks"); - xTaskCreate(ethernet_task, "EthernetTask", 4096, NULL, 1, NULL); - xTaskCreate(wifi_task, "WiFiTask", 10000, NULL, 1, NULL); - xTaskCreate(read_sensor_task, "ReadSensorTask", 1024 * 4, NULL, 1, NULL); - xTaskCreate(collect_internal_telemetry_task, "InternalTelemetryTask", 2048, NULL, 1, NULL); - xTaskCreate(display_task, "DisplayTask", 10000, NULL, 1, NULL); - xTaskCreate(get_time_task, "GetTimeTask", 1024 * 4, NULL, 1, NULL); + + // Create a queue for water data communication between sensor and webserver tasks + QueueHandle_t webserverWaterDataQueue = xQueueCreate(10, sizeof(WaterData)); + if (webserverWaterDataQueue == NULL) { + LOG(ELOG_LEVEL_ERROR, "Failed to create webserver water data queue"); + } else { + xTaskCreate(ethernet_task, "EthernetTask", 4096, NULL, 1, NULL); + xTaskCreate(wifi_task, "WiFiTask", 10000, NULL, 1, NULL); + xTaskCreate(read_sensor_task, "ReadSensorTask", 1024 * 4, webserverWaterDataQueue, 1, NULL); + xTaskCreate(collect_internal_telemetry_task, "InternalTelemetryTask", 2048, NULL, 1, NULL); + xTaskCreate(display_task, "DisplayTask", 10000, NULL, 1, NULL); + xTaskCreate(get_time_task, "GetTimeTask", 1024 * 4, NULL, 1, NULL); - delay(5000); - xTaskCreate(check_update_task, "CheckUpdateTask", 1024 * 8, NULL, 1, NULL); - xTaskCreate(webserver_task, "WebServerTask", 1024 * 8, NULL, 1, NULL); + delay(5000); + xTaskCreate(check_update_task, "CheckUpdateTask", 1024 * 8, NULL, 1, NULL); + xTaskCreate(webserver_task, "WebServerTask", 1024 * 8, webserverWaterDataQueue, 1, NULL); + } LOG(ELOG_LEVEL_DEBUG, "Starting OTA handler"); ArduinoOTA.begin(); diff --git a/src/networking/json_builder.cpp b/src/networking/json_builder.cpp index 55705ab..84e7f53 100644 --- a/src/networking/json_builder.cpp +++ b/src/networking/json_builder.cpp @@ -1,7 +1,6 @@ #include "json_builder.h" #include -extern WaterData water_data; extern DeviceTelemetry telemetry; extern NetworkData wifi_data; extern NetworkData ethernet_data; diff --git a/src/networking/webserver.cpp b/src/networking/webserver.cpp index cef674e..cc56ad6 100644 --- a/src/networking/webserver.cpp +++ b/src/networking/webserver.cpp @@ -10,14 +10,17 @@ #include "AsyncJson.h" #include #include +#include +#include +#include #include "../global_data/global_data.h" #include "../tools/tools.h" +#include "../tools/readers_writer_lock.h" #include "../global_data/defines.h" #include "json_builder.h" extern Preferences prefs; -extern WaterData water_data; extern DeviceTelemetry telemetry; extern NetworkData wifi_data; extern NetworkData ethernet_data; @@ -27,6 +30,15 @@ extern OTAStatus ota_status; AsyncWebSocket webSocket("/webSocket"); AsyncWebServer server(80); +// Local water data cache +WaterData local_water_data; + +// Readers-writer lock for local_water_data +ReadersWriterLock waterDataLock; + +// Queue to receive water data from the sensor task +QueueHandle_t webserverWaterDataQueue; + // ====================== // Webserver Setup @@ -84,19 +96,26 @@ void setup_api_endpoints(){ request->send(200, "application/json", output); }); server.on("/water_data", HTTP_GET, [](AsyncWebServerRequest* request) { + rwLockAcquireRead(&waterDataLock); String output; - serializeJson(build_water_data_json(water_data), output); - request->send(200, "application/json", output); }); + serializeJson(build_water_data_json(local_water_data), output); + rwLockReleaseRead(&waterDataLock); + request->send(200, "application/json", output); + }); server.on("/raw_percent", HTTP_GET, [](AsyncWebServerRequest* request) { - String output; - output = water_data.percentage; - request->send(200, "text/raw", output); }); + rwLockAcquireRead(&waterDataLock); + String output = String(local_water_data.percentage); + rwLockReleaseRead(&waterDataLock); + request->send(200, "text/raw", output); + }); server.on("/raw_level", HTTP_GET, [](AsyncWebServerRequest* request) { - String output; - output = water_data.level; - request->send(200, "text/raw", output); }); + rwLockAcquireRead(&waterDataLock); + String output = String(local_water_data.level); + rwLockReleaseRead(&waterDataLock); + request->send(200, "text/raw", output); + }); server.on("/telemetry", HTTP_GET, [](AsyncWebServerRequest* request) { String output; @@ -215,18 +234,51 @@ void handle_update_sensor_settings(AsyncWebServerRequest* request) { /** * @brief Main task for the webserver. * - * Initializes all routes and starts the webserver. + * Initializes all routes, starts the webserver, and sets up local water data. + * Receives water data from the sensor task via a queue and updates local_water_data. * Runs indefinitely to keep the server active. * - * @param pvParameters Task parameters (unused). + * @param pvParameters Task parameters (expected to be a QueueHandle_t for the water data queue). */ void webserver_task(void *pvParameters) { + // Extract the queue handle from the task parameters + webserverWaterDataQueue = (QueueHandle_t)pvParameters; + if (webserverWaterDataQueue == NULL) { + LOG(ELOG_LEVEL_ERROR, "Webserver water data queue is NULL"); + vTaskDelete(NULL); + return; + } + LOG(ELOG_LEVEL_DEBUG, "Setting up routes"); setup_routes(); + // Initialize the readers-writer lock + if (!rwLockInit(&waterDataLock)) { + LOG(ELOG_LEVEL_ERROR, "Failed to initialize readers-writer lock"); + vTaskDelete(NULL); + return; + } + + // Initialize local water data with static values + rwLockAcquireWrite(&waterDataLock); + local_water_data.level = 50.0f; + local_water_data.liters = 100.0f; + local_water_data.percentage = 50.0f; + rwLockReleaseWrite(&waterDataLock); + LOG(ELOG_LEVEL_DEBUG, "Starting webserver"); server.begin(); + while (1) { - vTaskDelay(portMAX_DELAY); + // Check if there is new water data in the queue + WaterData newWaterData; + if (xQueueReceive(webserverWaterDataQueue, &newWaterData, 0) == pdTRUE) { + // Update local_water_data with the new data from the queue + rwLockAcquireWrite(&waterDataLock); + local_water_data = newWaterData; + rwLockReleaseWrite(&waterDataLock); + } + + vTaskDelay(100 / portTICK_PERIOD_MS); // Small delay to reduce CPU usage } } \ No newline at end of file diff --git a/src/networking/webserver.h b/src/networking/webserver.h index 17776c6..21d83ec 100644 --- a/src/networking/webserver.h +++ b/src/networking/webserver.h @@ -1,6 +1,5 @@ #ifndef ASYNC_WEBSERVER_H #define ASYNC_WEBSERVER_H -#endif // ASYNC_WEBSERVER_H #include #include @@ -13,3 +12,5 @@ void webserver_task(void *pvParameters); void handle_update_wifi_credentials(AsyncWebServerRequest* request); void handle_update_sensor_settings(AsyncWebServerRequest* request); +#endif // ASYNC_WEBSERVER_H + diff --git a/src/sensor/sensor.cpp b/src/sensor/sensor.cpp index 6c2abca..66c83cc 100644 --- a/src/sensor/sensor.cpp +++ b/src/sensor/sensor.cpp @@ -3,6 +3,7 @@ #include #include "Wire.h" #include "../global_data/global_data.h" +#include "../networking/webserver.h" #ifdef USE_INA226 @@ -47,6 +48,14 @@ void init_sensor(){ void read_sensor_task(void* parameter) { + // Extract the queue handle from the task parameters + QueueHandle_t webserverWaterDataQueue = (QueueHandle_t)parameter; + if (webserverWaterDataQueue == NULL) { + LOG(ELOG_LEVEL_ERROR, "Webserver water data queue is NULL"); + vTaskDelete(NULL); + return; + } + LOG(ELOG_LEVEL_DEBUG, "Starting read sensor tasks"); while (true) { // Get Values from sensor @@ -111,6 +120,11 @@ void read_sensor_task(void* parameter) water_data.liters = liters; water_data.percentage = percentage_rounded; + // Send the water data to the webserver task via the queue + if (xQueueSend(webserverWaterDataQueue, &water_data, 0) != pdTRUE) { + LOG(ELOG_LEVEL_ERROR, "Failed to send water data to webserver queue"); + } + delay(20000); } } \ No newline at end of file diff --git a/src/tools/readers_writer_lock.cpp b/src/tools/readers_writer_lock.cpp new file mode 100644 index 0000000..4c75541 --- /dev/null +++ b/src/tools/readers_writer_lock.cpp @@ -0,0 +1,70 @@ +#include "readers_writer_lock.h" +#include + +bool rwLockInit(ReadersWriterLock* rwLock) { + if (rwLock == NULL) { + return false; + } + + rwLock->mutex = xSemaphoreCreateMutex(); + if (rwLock->mutex == NULL) { + return false; + } + + rwLock->readersCount = 0; + rwLock->writerActive = false; + + return true; +} + +void rwLockAcquireRead(ReadersWriterLock* rwLock) { + if (rwLock == NULL) { + return; + } + + xSemaphoreTake(rwLock->mutex, portMAX_DELAY); + // Wait until no writer is active + while (rwLock->writerActive) { + xSemaphoreGive(rwLock->mutex); + vTaskDelay(10 / portTICK_PERIOD_MS); // Small delay to avoid busy-waiting + xSemaphoreTake(rwLock->mutex, portMAX_DELAY); + } + rwLock->readersCount++; + xSemaphoreGive(rwLock->mutex); +} + +void rwLockReleaseRead(ReadersWriterLock* rwLock) { + if (rwLock == NULL) { + return; + } + + xSemaphoreTake(rwLock->mutex, portMAX_DELAY); + rwLock->readersCount--; + xSemaphoreGive(rwLock->mutex); +} + +void rwLockAcquireWrite(ReadersWriterLock* rwLock) { + if (rwLock == NULL) { + return; + } + + xSemaphoreTake(rwLock->mutex, portMAX_DELAY); + // Set the writer active flag to block new readers + rwLock->writerActive = true; + // Wait until no readers are active + while (rwLock->readersCount > 0) { + xSemaphoreGive(rwLock->mutex); + vTaskDelay(10 / portTICK_PERIOD_MS); // Small delay to avoid busy-waiting + xSemaphoreTake(rwLock->mutex, portMAX_DELAY); + } + // Writer now has exclusive access +} + +void rwLockReleaseWrite(ReadersWriterLock* rwLock) { + if (rwLock == NULL) { + return; + } + + rwLock->writerActive = false; + xSemaphoreGive(rwLock->mutex); +} diff --git a/src/tools/readers_writer_lock.h b/src/tools/readers_writer_lock.h new file mode 100644 index 0000000..55b7d58 --- /dev/null +++ b/src/tools/readers_writer_lock.h @@ -0,0 +1,65 @@ +#ifndef READERS_WRITER_LOCK_H +#define READERS_WRITER_LOCK_H + +#include +#include + +/** + * @brief Readers-Writer Lock structure. + * + * This structure holds the state for a readers-writer lock, + * allowing multiple readers or a single writer to access a shared resource. + */ +typedef struct { + SemaphoreHandle_t mutex; /**< Mutex to protect the lock state. */ + int readersCount; /**< Number of active readers. */ + bool writerActive; /**< Flag to indicate if a writer is active. */ +} ReadersWriterLock; + +/** + * @brief Initialize a readers-writer lock. + * + * Initializes the mutex and sets the initial state of the lock. + * + * @param rwLock Pointer to the ReadersWriterLock structure to initialize. + * @return true if initialization succeeded, false otherwise. + */ +bool rwLockInit(ReadersWriterLock* rwLock); + +/** + * @brief Acquire a read lock. + * + * Blocks if a writer is active, otherwise allows multiple readers to access the data simultaneously. + * + * @param rwLock Pointer to the ReadersWriterLock structure. + */ +void rwLockAcquireRead(ReadersWriterLock* rwLock); + +/** + * @brief Release a read lock. + * + * Decrements the readers count. + * + * @param rwLock Pointer to the ReadersWriterLock structure. + */ +void rwLockReleaseRead(ReadersWriterLock* rwLock); + +/** + * @brief Acquire a write lock. + * + * Blocks until all readers have finished and sets the writer active flag. + * + * @param rwLock Pointer to the ReadersWriterLock structure. + */ +void rwLockAcquireWrite(ReadersWriterLock* rwLock); + +/** + * @brief Release a write lock. + * + * Clears the writer active flag and allows readers and writers. + * + * @param rwLock Pointer to the ReadersWriterLock structure. + */ +void rwLockReleaseWrite(ReadersWriterLock* rwLock); + +#endif // READERS_WRITER_LOCK_H