diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index c68acad..8956da0 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -33,7 +33,7 @@ "customizations": { "vscode": { "extensions": [ - "serayuzgur.crates", + "fill-labs.dependi", "tamasfe.even-better-toml", "vadimcn.vscode-lldb", "mutantdino.resourcemonitor", diff --git a/Cargo.toml b/Cargo.toml index 510997c..42aecb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,9 +8,10 @@ edition = "2021" [dependencies] actix-web = "4.4.0" chrono = { version = "0.4.31", features = ["serde"] } -env_logger = "0.10.0" +dotenvy = "0.15" +env_logger = "0.11" log = "0.4.20" serde = "1.0.188" -sqlx = { version = "0.7", features = [ "runtime-tokio", "tls-rustls", "postgres", "migrate", "chrono"] } -sqlx-cli = "0.7.1" -thiserror = "1.0.48" +sqlx = { version = "0.8", features = [ "runtime-tokio", "tls-rustls", "postgres", "migrate", "chrono", "mac_address"] } +sqlx-cli = "0.8" +thiserror = "1.0" diff --git a/migrations/1_devices.sql b/migrations/1_devices.sql index 1227047..67a86e8 100644 --- a/migrations/1_devices.sql +++ b/migrations/1_devices.sql @@ -1,5 +1,5 @@ -- Add migration script here CREATE TABLE Devices ( - ID CHAR(32) PRIMARY KEY, + ID MACADDR PRIMARY KEY, display_name VARCHAR(255) ); \ No newline at end of file diff --git a/migrations/2_telemetry.sql b/migrations/2_telemetry.sql index 84bd4d9..f511e41 100644 --- a/migrations/2_telemetry.sql +++ b/migrations/2_telemetry.sql @@ -5,6 +5,6 @@ CREATE TABLE Telemetry ( Voltage FLOAT, Temperature FLOAT, uptime INT NOT NULL, - device_id CHAR(32) NOT NULL, + device_id MACADDR NOT NULL, FOREIGN KEY (device_id) REFERENCES Devices(ID) ); \ No newline at end of file diff --git a/migrations/3_values.sql b/migrations/3_values.sql index 0f93283..7d5b54b 100644 --- a/migrations/3_values.sql +++ b/migrations/3_values.sql @@ -3,7 +3,7 @@ CREATE TABLE Values ( timestamp TIMESTAMP NOT NULL, value FLOAT NOT NULL, value_id INT NOT NULL, - device_id CHAR(32) NOT NULL, + device_id MACADDR NOT NULL, active_errors INT NOT NULL, FOREIGN KEY (device_id) REFERENCES Devices(ID) ); \ No newline at end of file diff --git a/src/database.rs b/src/database.rs index e8d0496..e2e437e 100644 --- a/src/database.rs +++ b/src/database.rs @@ -1,7 +1,7 @@ -use actix_web::{cookie::time::error, web}; +use actix_web::web; use chrono::Utc; use log::{error, info}; -use sqlx::{migrate, pool, postgres::PgPoolOptions, query, query_as, PgPool, Pool, Postgres}; +use sqlx::{migrate, postgres::PgPoolOptions, query, query_as, types::mac_address::MacAddress, Pool, Postgres}; use thiserror::Error; use crate::schemas::{TelemetryMessage, TelemetryMessageFromDevice, ValueMessageFromDevice, ValueMessage, Device}; @@ -35,6 +35,10 @@ impl Database { } } + pub async fn init_from_pool(pool: Pool) -> Database { + Database { conn_pool: pool } + } + // Check if the necessary tables exist. If not, create them. TODO auto-migration pub async fn init_db(&self) { info!("Checking if required tables exist"); @@ -47,7 +51,7 @@ impl Database { }; } - pub async fn add_device(&self, device_id: &str) -> Result<(), DatabaseError> { + pub async fn add_device(&self, device_id: &MacAddress) -> Result<(), DatabaseError> { info!("Adding device with the ID {}", &device_id); query!("INSERT INTO Devices (ID) VALUES ($1);", device_id) .execute(&self.conn_pool) @@ -55,7 +59,15 @@ impl Database { Ok(()) } - pub async fn create_device_if_not_exists(&self, device_id: &str) -> Result<(), DatabaseError> { + pub async fn add_display_name(&self, device_id: &MacAddress, display_name: &str) -> Result<(), DatabaseError> { + info!("Adding Displayname {display_name} to Device with ID {device_id}"); + query!("UPDATE Devices SET display_name = $1 WHERE id = $2;", display_name, device_id) + .execute(&self.conn_pool) + .await?; + Ok(()) + } + + pub async fn create_device_if_not_exists(&self, device_id: &MacAddress) -> Result<(), DatabaseError> { info!("Checking if device with the ID {} exists", &device_id); let exists_result = query!("SELECT count(*) FROM devices WHERE ID = $1;", device_id) .fetch_one(&self.conn_pool) @@ -82,7 +94,7 @@ impl Database { pub async fn add_telemetry( &self, msg: &web::Json, - device_id: &str, + device_id: &MacAddress, ) -> Result<(), DatabaseError> { info!("Adding telemetry message to DB"); let current_timestamp = Utc::now().naive_utc(); @@ -95,7 +107,7 @@ impl Database { pub async fn get_telemetry_for_id( &self, - device_id: &str, + device_id: &MacAddress, ) -> Result, DatabaseError> { info!("Getting telemetry messages for {} from DB", &device_id); let messages = query_as!( @@ -114,7 +126,7 @@ impl Database { pub async fn add_value( &self, msg: &web::Json, - device_id: &str, + device_id: &MacAddress, ) -> Result<(), DatabaseError> { info!("Adding value to DB"); let current_timestamp = Utc::now().naive_utc(); @@ -126,7 +138,7 @@ impl Database { Ok(()) } - pub async fn get_values_for_id(&self, device_id: &str) -> Result, DatabaseError> { + pub async fn get_values_for_id(&self, device_id: &MacAddress) -> Result, DatabaseError> { info!("Getting values for {} from DB", &device_id); let values = query_as!( ValueMessage, @@ -154,3 +166,27 @@ impl Database { Ok(devices) } } + +#[cfg(test)] +mod tests { + use super::*; + use dotenvy::dotenv; + use sqlx::PgPool; + + #[sqlx::test] + async fn add_device(pool: PgPool) { + dotenv().ok(); + let db = Database::init_from_pool(pool).await; + + let test_device = Device{ + display_name: Some("Waterlevel daheim".to_owned()), + id: MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]) + }; + db.add_device(&MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F])).await.unwrap(); + db.add_display_name(&MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]), "Waterlevel daheim").await.unwrap(); + + let devices = db.get_devices().await.unwrap(); + assert_eq!(test_device, devices[0]); + + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 4fae69f..06ac504 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,15 @@ use std::{env, process}; use crate::schemas::{TelemetryMessageFromDevice, ValueMessageFromDevice}; -use actix_web::{get, http::StatusCode, post, web, App, HttpResponse, HttpServer, Responder}; +use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder}; use database::Database; -use log::{debug, error, info}; +use log::{error, info}; +use sqlx::types::mac_address::MacAddress; +use util::parse_mac_address; mod database; mod schemas; +mod util; struct AppState { db: Database, @@ -19,7 +22,11 @@ async fn receive_telemetry( telemetry_message: web::Json, ) -> impl Responder { info!("POST - telementry - Processing device id {}", device_id); - match data.db.create_device_if_not_exists(&device_id).await { + let Ok(mac_converted) = parse_mac_address(&device_id) else { + return HttpResponse::InternalServerError(); + }; + let mac_converted = MacAddress::from(mac_converted); + match data.db.create_device_if_not_exists(&mac_converted).await { Ok(_) => {} Err(e) => { error!("Error creating new device: {}", e); @@ -27,7 +34,7 @@ async fn receive_telemetry( } }; - match data.db.add_telemetry(&telemetry_message, &device_id).await { + match data.db.add_telemetry(&telemetry_message, &mac_converted).await { Ok(_) => HttpResponse::Created(), Err(e) => { error!("adding Telemetry message to DB failed \n{}", e); @@ -39,7 +46,11 @@ async fn receive_telemetry( #[get("/telemetry/{device_id}")] async fn get_telemetry(device_id: web::Path, data: web::Data) -> impl Responder { info!("GET - telementry - Processing device id {}", device_id); - let messages = match data.db.get_telemetry_for_id(&device_id).await { + let Ok(mac_converted) = parse_mac_address(&device_id) else { + return HttpResponse::InternalServerError().finish(); + }; + let mac_converted = MacAddress::from(mac_converted); + let messages = match data.db.get_telemetry_for_id(&mac_converted).await { Ok(msgs) => msgs, Err(e) => { error!("Getting Telemetry Messages from DB failed \n{}", e); @@ -56,7 +67,11 @@ async fn receive_value( value_message: web::Json, ) -> impl Responder { info!("POST - value - Processing device id {}", device_id); - match data.db.create_device_if_not_exists(&device_id).await { + let Ok(mac_converted) = parse_mac_address(&device_id) else { + return HttpResponse::InternalServerError(); + }; + let mac_converted = MacAddress::from(mac_converted); + match data.db.create_device_if_not_exists(&mac_converted).await { Ok(_) => {} Err(e) => { error!("Error creating new device: {}", e); @@ -64,7 +79,7 @@ async fn receive_value( } }; - match data.db.add_value(&value_message, &device_id).await { + match data.db.add_value(&value_message, &mac_converted).await { Ok(_) => HttpResponse::Created(), Err(e) => { error!("adding Telemetry message to DB failed \n{}", e); @@ -76,7 +91,11 @@ async fn receive_value( #[get("/value/{device_id}")] async fn get_value(device_id: web::Path, data: web::Data) -> impl Responder { info!("GET - value - Processing device id {}", device_id); - let messages = match data.db.get_values_for_id(&device_id).await { + let Ok(mac_converted) = parse_mac_address(&device_id) else { + return HttpResponse::InternalServerError().finish(); + }; + let mac_converted = MacAddress::from(mac_converted); + let messages = match data.db.get_values_for_id(&mac_converted).await { Ok(msgs) => msgs, Err(e) => { error!("Getting Values from DB failed \n{}", e); @@ -108,7 +127,7 @@ async fn main() -> std::io::Result<()> { let db_url = match env::var("DATABASE_URL") { Ok(url) => url, Err(e) => { - error!("Failed reading DATABASE_URL"); + error!("Failed reading DATABASE_URL: {e}"); process::exit(1); } }; diff --git a/src/schemas.rs b/src/schemas.rs index c3f578c..48339de 100644 --- a/src/schemas.rs +++ b/src/schemas.rs @@ -1,5 +1,6 @@ use chrono::NaiveDateTime; -use serde::{Deserialize, Serialize}; +use serde::{ser::SerializeStruct, Deserialize, Serialize}; +use sqlx::types::mac_address::MacAddress; #[derive(Deserialize, Debug, Serialize)] pub struct TelemetryMessage { @@ -32,8 +33,26 @@ pub struct ValueMessage { pub timestamp: NaiveDateTime, } -#[derive(Deserialize, Debug, Serialize)] +#[derive(Debug, PartialEq)] pub struct Device { pub display_name: Option, - pub id: String + pub id: MacAddress +} + + +impl Serialize for Device { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut state = serializer.serialize_struct("Device", 2)?; + + // Serialize each field with custom logic + let bytes = self.id.bytes(); + state.serialize_field("display_name", &self.display_name)?; + state.serialize_field("id", &format!("{}{}{}{}{}{}", bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5]))?; + + // End the serialization process + state.end() + } } \ No newline at end of file diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..20c941d --- /dev/null +++ b/src/util.rs @@ -0,0 +1,71 @@ + + +use thiserror::Error; + + + +pub fn parse_mac_address(mac: &str) -> Result<[u8; 6], MacAddressError> { + if mac.len() != 12 { + return Err(MacAddressError::Length(mac.len())) + } + + let mut mac_bytes = [0u8; 6]; + for i in 0..6 { + let hex_part = &mac[i * 2..i * 2 + 2]; + mac_bytes[i] = u8::from_str_radix(hex_part, 16)?; + } + + Ok(mac_bytes) +} + +#[derive(Error, Debug)] +pub enum MacAddressError { + #[error("Error converting MAC Address")] + Conversion(#[from] std::num::ParseIntError), + #[error("MAC Address length mismatch")] + Length(usize), +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_valid_mac_address_plain() { + let mac_str = "001A2B3C4D5E"; + let expected = [0x00, 0x1A, 0x2B, 0x3C, 0x4D, 0x5E]; + assert_eq!(parse_mac_address(mac_str).unwrap(), expected); + } + + #[test] + fn test_valid_lowercase_mac_address() { + let mac_str = "001a2b3c4d5e"; + let expected = [0x00, 0x1A, 0x2B, 0x3C, 0x4D, 0x5E]; + assert_eq!(parse_mac_address(mac_str).unwrap(), expected); + } + + #[test] + fn test_invalid_mac_address_wrong_length() { + let mac_str = "001A2B3C4D"; + assert!(parse_mac_address(mac_str).is_err()); + } + + #[test] + fn test_invalid_mac_address_invalid_characters() { + let mac_str = "001A2B3C4DZZ"; + assert!(parse_mac_address(mac_str).is_err()); + } + + #[test] + fn test_empty_mac_address() { + let mac_str = ""; + assert!(parse_mac_address(mac_str).is_err()); + } + + #[test] + fn test_mac_address_with_extra_spaces() { + let mac_str = "001A2B3C 4D5E"; + assert!(parse_mac_address(mac_str).is_err()); + } +} +