diff --git a/build.rs b/build.rs index 7609593..d506869 100644 --- a/build.rs +++ b/build.rs @@ -2,4 +2,4 @@ fn main() { // trigger recompilation when a new migration is added println!("cargo:rerun-if-changed=migrations"); -} \ No newline at end of file +} diff --git a/migrations/3_values.sql b/migrations/3_values.sql new file mode 100644 index 0000000..0f93283 --- /dev/null +++ b/migrations/3_values.sql @@ -0,0 +1,9 @@ +-- Add migration script here +CREATE TABLE Values ( + timestamp TIMESTAMP NOT NULL, + value FLOAT NOT NULL, + value_id INT NOT NULL, + device_id CHAR(32) NOT NULL, + active_errors INT NOT NULL, + FOREIGN KEY (device_id) REFERENCES Devices(ID) +); \ No newline at end of file diff --git a/src/database.rs b/src/database.rs index 9bf1791..e2d41f6 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,10 +1,10 @@ use actix_web::{cookie::time::error, web}; -use log::{error, info}; -use sqlx::{pool, postgres::PgPoolOptions, query, PgPool, Pool, Postgres, migrate, query_as}; -use thiserror::Error; use chrono::Utc; +use log::{error, info}; +use sqlx::{migrate, pool, postgres::PgPoolOptions, query, query_as, PgPool, Pool, Postgres}; +use thiserror::Error; -use crate::schemas::{TelemetryMessage, TelemetryMessageFromDevice}; +use crate::schemas::{TelemetryMessage, TelemetryMessageFromDevice, ValueMessageFromDevice, ValueMessage}; #[derive(Clone)] pub struct Database { @@ -14,7 +14,7 @@ pub struct Database { #[derive(Error, Debug)] pub enum DatabaseError { #[error("Generic SQLX Error")] - SqlxError(#[from] sqlx::Error) + SqlxError(#[from] sqlx::Error), } impl Database { @@ -38,8 +38,8 @@ impl Database { // Check if the necessary tables exist. If not, create them. TODO auto-migration pub async fn init_db(&self) { info!("Checking if required tables exist"); - match migrate!().run(&self.conn_pool).await{ - Ok(_) => {}, + match migrate!().run(&self.conn_pool).await { + Ok(_) => {} Err(e) => { error!("Error when running migrations {}", e); std::process::exit(1); @@ -49,17 +49,18 @@ impl Database { pub async fn add_device(&self, device_id: &str) -> Result<(), DatabaseError> { info!("Adding device with the ID {}", &device_id); - query!("INSERT INTO Devices (ID) VALUES ($1);", device_id).execute(&self.conn_pool).await?; + query!("INSERT INTO Devices (ID) VALUES ($1);", device_id) + .execute(&self.conn_pool) + .await?; Ok(()) } pub async fn create_device_if_not_exists(&self, device_id: &str) -> Result<(), DatabaseError> { info!("Checking if device with the ID {} exists", &device_id); - let exists_result = - query!("SELECT count(*) FROM devices WHERE ID = $1;", device_id) - .fetch_one(&self.conn_pool) - .await; - + let exists_result = query!("SELECT count(*) FROM devices WHERE ID = $1;", device_id) + .fetch_one(&self.conn_pool) + .await; + let exists = match exists_result { Ok(res) => res.count > Some(0), Err(err) => { @@ -68,7 +69,7 @@ impl Database { } }; - if exists{ + if exists { info!("Device exists"); Ok(()) } else { @@ -78,7 +79,11 @@ impl Database { } } - pub async fn add_telemetry(&self, msg: &web::Json, device_id: &str) -> Result<(), DatabaseError> { + pub async fn add_telemetry( + &self, + msg: &web::Json, + device_id: &str, + ) -> Result<(), DatabaseError> { info!("Adding telemetry message to DB"); let current_timestamp = Utc::now().naive_utc(); query!("INSERT INTO Telemetry (timestamp, software_version, voltage, temperature, uptime, device_id) @@ -88,15 +93,51 @@ impl Database { Ok(()) } - pub async fn get_telemetry_for_id(&self, device_id: &str) -> Result, DatabaseError> { + pub async fn get_telemetry_for_id( + &self, + device_id: &str, + ) -> Result, DatabaseError> { info!("Getting telemetry messages for {} from DB", &device_id); - let messages = query_as!(TelemetryMessage, + let messages = query_as!( + TelemetryMessage, "SELECT timestamp, software_version, voltage, temperature, uptime FROM Telemetry - WHERE device_id = $1 ORDER BY timestamp DESC;", &device_id) - .fetch_all(&self.conn_pool).await?; - + WHERE device_id = $1 ORDER BY timestamp DESC;", + &device_id + ) + .fetch_all(&self.conn_pool) + .await?; Ok(messages) } + + pub async fn add_value( + &self, + msg: &web::Json, + device_id: &str, + ) -> Result<(), DatabaseError> { + info!("Adding value to DB"); + let current_timestamp = Utc::now().naive_utc(); + query!(" + INSERT INTO values (timestamp, value, device_id, active_errors, value_id) + VALUES ($1, $2, $3, $4, $5);", + current_timestamp, msg.value, device_id, msg.active_errors, msg.value_id).execute(&self.conn_pool).await?; + + Ok(()) + } + + pub async fn get_values_for_id(&self, device_id: &str) -> Result, DatabaseError> { + info!("Getting values for {} from DB", &device_id); + let values = query_as!( + ValueMessage, + "SELECT timestamp, value, value_id + FROM Values + WHERE device_id = $1 ORDER BY timestamp DESC;", + &device_id + ) + .fetch_all(&self.conn_pool) + .await?; + + Ok(values) + } } diff --git a/src/main.rs b/src/main.rs index e6d3010..5a73ee6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ -use actix_web::{post, web, App, HttpServer, Responder, http::StatusCode, HttpResponse, get}; +use crate::schemas::{TelemetryMessageFromDevice, ValueMessageFromDevice}; +use actix_web::{get, http::StatusCode, post, web, App, HttpResponse, HttpServer, Responder}; use database::Database; -use log::{info, error, debug}; -use crate::schemas::TelemetryMessageFromDevice; +use log::{debug, error, info}; mod database; mod schemas; @@ -14,18 +14,18 @@ struct AppState { async fn receive_telemetry( device_id: web::Path, data: web::Data, - telemetry_message: web::Json + telemetry_message: web::Json, ) -> impl Responder { info!("POST - telementry - Processing device id {}", device_id); - match data.db.create_device_if_not_exists(&device_id).await{ - Ok(_) => {}, + match data.db.create_device_if_not_exists(&device_id).await { + Ok(_) => {} Err(e) => { error!("Error creating new device: {}", e); return HttpResponse::InternalServerError(); } }; - match data.db.add_telemetry(&telemetry_message, &device_id).await{ + match data.db.add_telemetry(&telemetry_message, &device_id).await { Ok(_) => HttpResponse::Created(), Err(e) => { error!("adding Telemetry message to DB failed \n{}", e); @@ -35,22 +35,53 @@ async fn receive_telemetry( } #[get("/telemetry/{device_id}")] -async fn get_telemetry( - device_id: web::Path, - data: web::Data -) -> impl Responder { +async fn get_telemetry(device_id: web::Path, data: web::Data) -> impl Responder { info!("GET - telementry - Processing device id {}", device_id); - let messages = match data.db.get_telemetry_for_id(&device_id).await{ + let messages = match data.db.get_telemetry_for_id(&device_id).await { Ok(msgs) => msgs, Err(e) => { error!("Getting Telemetry Messages from DB failed \n{}", e); - return HttpResponse::InternalServerError().finish() + return HttpResponse::InternalServerError().finish(); + } + }; + HttpResponse::Ok().json(messages) +} + +#[post("/value/{device_id}")] +async fn receive_value( + device_id: web::Path, + data: web::Data, + value_message: web::Json, +) -> impl Responder { + info!("POST - value - Processing device id {}", device_id); + match data.db.create_device_if_not_exists(&device_id).await { + Ok(_) => {} + Err(e) => { + error!("Error creating new device: {}", e); + return HttpResponse::InternalServerError(); } }; + match data.db.add_value(&value_message, &device_id).await { + Ok(_) => HttpResponse::Created(), + Err(e) => { + error!("adding Telemetry message to DB failed \n{}", e); + HttpResponse::InternalServerError() + } + } +} +#[get("/value/{device_id}")] +async fn get_value(device_id: web::Path, data: web::Data) -> impl Responder { + info!("GET - value - Processing device id {}", device_id); + let messages = match data.db.get_values_for_id(&device_id).await { + Ok(msgs) => msgs, + Err(e) => { + error!("Getting Values from DB failed \n{}", e); + return HttpResponse::InternalServerError().finish(); + } + }; HttpResponse::Ok().json(messages) - } #[actix_web::main] @@ -67,6 +98,8 @@ async fn main() -> std::io::Result<()> { .app_data(web::Data::new(AppState { db: db.clone() })) .service(receive_telemetry) .service(get_telemetry) + .service(receive_value) + .service(get_value) }) .bind(("127.0.0.1", 8080))? .run() diff --git a/src/schemas.rs b/src/schemas.rs index bcb53d1..923682c 100644 --- a/src/schemas.rs +++ b/src/schemas.rs @@ -7,7 +7,7 @@ pub struct TelemetryMessage { pub voltage: Option, pub temperature: Option, pub software_version: i32, - pub timestamp: NaiveDateTime + pub timestamp: NaiveDateTime, } #[derive(Deserialize, Debug, Serialize)] @@ -16,4 +16,18 @@ pub struct TelemetryMessageFromDevice { pub voltage: Option, pub temperature: Option, pub software_version: i32, -} \ No newline at end of file +} + +#[derive(Deserialize, Debug, Serialize)] +pub struct ValueMessageFromDevice { + pub value: f64, + pub value_id: i32, + pub active_errors: i32, +} + +#[derive(Deserialize, Debug, Serialize)] +pub struct ValueMessage { + pub value: f64, + pub value_id: i32, + pub timestamp: NaiveDateTime, +}