use actix_web::web; use chrono::Utc; use log::{error, info}; use sqlx::{ migrate, postgres::PgPoolOptions, query, query_as, types::mac_address::MacAddress, Pool, Postgres, }; use thiserror::Error; use crate::schemas::{ Device, TelemetryMessage, TelemetryMessageFromDevice, ValueMessage, ValueMessageFromDevice, }; #[derive(Clone)] pub struct Database { conn_pool: Pool, } #[derive(Error, Debug)] pub enum DatabaseError { #[error("Generic SQLX Error")] SqlxError(#[from] sqlx::Error), } impl Database { pub async fn init(database_url: &str) -> Database { match PgPoolOptions::new() .max_connections(10) .connect(database_url) .await { Ok(pool) => { info!("Connection to the database is successful!"); Database { conn_pool: pool } } Err(err) => { error!("Failed to connect to the database: {err:?}"); std::process::exit(1); } } } #[cfg(test)] pub fn init_from_pool(pool: Pool) -> Database { Database { conn_pool: pool } } // Check if the necessary tables exist. If not, create them. TODO auto-migration pub async fn init_db(&self) { info!("Checking if required tables exist"); match migrate!().run(&self.conn_pool).await { Ok(()) => {} Err(e) => { error!("Error when running migrations {e}"); std::process::exit(1); } }; } pub async fn add_device(&self, device_id: &MacAddress) -> Result<(), DatabaseError> { info!("Adding device with the ID {}", &device_id); query!("INSERT INTO Devices (ID) VALUES ($1);", device_id) .execute(&self.conn_pool) .await?; Ok(()) } #[allow(dead_code)] pub async fn update_display_name( &self, device_id: &MacAddress, display_name: &str, ) -> Result<(), DatabaseError> { info!("Updating Displayname to {display_name} for Device with ID {device_id}"); query!( "UPDATE Devices SET display_name = $1 WHERE id = $2;", display_name, device_id ) .execute(&self.conn_pool) .await?; Ok(()) } pub async fn create_device_if_not_exists( &self, device_id: &MacAddress, ) -> Result<(), DatabaseError> { info!("Checking if device with the ID {} exists", &device_id); let exists_result = query!("SELECT count(*) FROM devices WHERE ID = $1;", device_id) .fetch_one(&self.conn_pool) .await; let exists = match exists_result { Ok(res) => res.count > Some(0), Err(err) => { error!("Error checking table existence: {err:?}"); std::process::exit(1); } }; if exists { info!("Device exists"); Ok(()) } else { info!("Device does not exist"); self.add_device(device_id).await?; Ok(()) } } pub async fn add_telemetry( &self, msg: &web::Json, device_id: &MacAddress, ) -> Result<(), DatabaseError> { info!("Adding telemetry message to DB"); let current_timestamp = Utc::now().naive_utc(); query!("INSERT INTO Telemetry (timestamp, software_version, voltage, temperature, uptime, device_id) VALUES ($1, $2, $3, $4, $5, $6);", current_timestamp, msg.software_version, msg.voltage, msg.temperature, msg.uptime, device_id ).execute(&self.conn_pool).await?; Ok(()) } pub async fn get_telemetry_for_id( &self, device_id: &MacAddress, ) -> Result, DatabaseError> { info!("Getting telemetry messages for {} from DB", &device_id); let messages = query_as!( TelemetryMessage, "SELECT timestamp, software_version, voltage, temperature, uptime FROM Telemetry WHERE device_id = $1 ORDER BY timestamp DESC;", &device_id ) .fetch_all(&self.conn_pool) .await?; Ok(messages) } pub async fn add_value( &self, msg: &ValueMessageFromDevice, device_id: &MacAddress, ) -> Result<(), DatabaseError> { info!("Adding value to DB"); let current_timestamp = Utc::now().naive_utc(); query!( " INSERT INTO values (timestamp, value, device_id, active_errors, value_id) VALUES ($1, $2, $3, $4, $5);", current_timestamp, msg.value, device_id, msg.active_errors, msg.value_id ) .execute(&self.conn_pool) .await?; Ok(()) } pub async fn get_values_for_id( &self, device_id: &MacAddress, ) -> Result, DatabaseError> { info!("Getting values for {} from DB", &device_id); let values = query_as!( ValueMessage, "SELECT timestamp, value, value_id FROM Values WHERE device_id = $1 ORDER BY timestamp DESC;", &device_id ) .fetch_all(&self.conn_pool) .await?; Ok(values) } pub async fn get_devices(&self) -> Result, DatabaseError> { info!("Getting all devices"); let devices = query_as!( Device, "SELECT id, display_name FROM Devices;", ) .fetch_all(&self.conn_pool) .await?; Ok(devices) } } #[cfg(test)] mod tests { use super::*; use sqlx::PgPool; #[sqlx::test] async fn add_device_and_display_name(pool: PgPool) { let db = Database::init_from_pool(pool); let test_device = Device { display_name: Some("Waterlevel daheim".to_owned()), id: MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]), }; db.add_device(&MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F])) .await .unwrap(); db.update_display_name( &MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]), "Waterlevel daheim", ) .await .unwrap(); let devices = db.get_devices().await.unwrap(); assert_eq!(test_device, devices[0]); } #[sqlx::test] async fn add_value(pool: PgPool) { let db = Database::init_from_pool(pool); let device_id = MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]); db.add_device(&device_id).await.unwrap(); let msg = ValueMessageFromDevice { active_errors: 0, value: 112.0, value_id: 1, }; db.add_value(&msg, &device_id).await.unwrap(); let values = db.get_values_for_id(&device_id).await.unwrap(); assert!((values[0].value - msg.value).abs() < 1e-5); assert_eq!(values[0].value_id, msg.value_id); } }