Files
iot-cloud-api/src/database.rs
Tobias Maier add82496a6
Some checks failed
Build Project / test (push) Failing after 9m3s
Fixed clippy, updated dependencies
2025-10-17 15:58:52 +00:00

245 lines
7.0 KiB
Rust

use actix_web::web;
use chrono::Utc;
use log::{error, info};
use sqlx::{
migrate, postgres::PgPoolOptions, query, query_as, types::mac_address::MacAddress, Pool,
Postgres,
};
use thiserror::Error;
use crate::schemas::{
Device, TelemetryMessage, TelemetryMessageFromDevice, ValueMessage, ValueMessageFromDevice,
};
#[derive(Clone)]
pub struct Database {
conn_pool: Pool<Postgres>,
}
#[derive(Error, Debug)]
pub enum DatabaseError {
#[error("Generic SQLX Error")]
SqlxError(#[from] sqlx::Error),
}
impl Database {
pub async fn init(database_url: &str) -> Database {
match PgPoolOptions::new()
.max_connections(10)
.connect(database_url)
.await
{
Ok(pool) => {
info!("Connection to the database is successful!");
Database { conn_pool: pool }
}
Err(err) => {
error!("Failed to connect to the database: {err:?}");
std::process::exit(1);
}
}
}
#[cfg(test)]
pub fn init_from_pool(pool: Pool<Postgres>) -> Database {
Database { conn_pool: pool }
}
// Check if the necessary tables exist. If not, create them. TODO auto-migration
pub async fn init_db(&self) {
info!("Checking if required tables exist");
match migrate!().run(&self.conn_pool).await {
Ok(()) => {}
Err(e) => {
error!("Error when running migrations {e}");
std::process::exit(1);
}
};
}
pub async fn add_device(&self, device_id: &MacAddress) -> Result<(), DatabaseError> {
info!("Adding device with the ID {}", &device_id);
query!("INSERT INTO Devices (ID) VALUES ($1);", device_id)
.execute(&self.conn_pool)
.await?;
Ok(())
}
#[allow(dead_code)]
pub async fn update_display_name(
&self,
device_id: &MacAddress,
display_name: &str,
) -> Result<(), DatabaseError> {
info!("Updating Displayname to {display_name} for Device with ID {device_id}");
query!(
"UPDATE Devices SET display_name = $1 WHERE id = $2;",
display_name,
device_id
)
.execute(&self.conn_pool)
.await?;
Ok(())
}
pub async fn create_device_if_not_exists(
&self,
device_id: &MacAddress,
) -> Result<(), DatabaseError> {
info!("Checking if device with the ID {} exists", &device_id);
let exists_result = query!("SELECT count(*) FROM devices WHERE ID = $1;", device_id)
.fetch_one(&self.conn_pool)
.await;
let exists = match exists_result {
Ok(res) => res.count > Some(0),
Err(err) => {
error!("Error checking table existence: {err:?}");
std::process::exit(1);
}
};
if exists {
info!("Device exists");
Ok(())
} else {
info!("Device does not exist");
self.add_device(device_id).await?;
Ok(())
}
}
pub async fn add_telemetry(
&self,
msg: &web::Json<TelemetryMessageFromDevice>,
device_id: &MacAddress,
) -> Result<(), DatabaseError> {
info!("Adding telemetry message to DB");
let current_timestamp = Utc::now().naive_utc();
query!("INSERT INTO Telemetry (timestamp, software_version, voltage, temperature, uptime, device_id)
VALUES ($1, $2, $3, $4, $5, $6);",
current_timestamp, msg.software_version, msg.voltage, msg.temperature, msg.uptime, device_id
).execute(&self.conn_pool).await?;
Ok(())
}
pub async fn get_telemetry_for_id(
&self,
device_id: &MacAddress,
) -> Result<Vec<TelemetryMessage>, DatabaseError> {
info!("Getting telemetry messages for {} from DB", &device_id);
let messages = query_as!(
TelemetryMessage,
"SELECT timestamp, software_version, voltage, temperature, uptime
FROM Telemetry
WHERE device_id = $1 ORDER BY timestamp DESC;",
&device_id
)
.fetch_all(&self.conn_pool)
.await?;
Ok(messages)
}
pub async fn add_value(
&self,
msg: &ValueMessageFromDevice,
device_id: &MacAddress,
) -> Result<(), DatabaseError> {
info!("Adding value to DB");
let current_timestamp = Utc::now().naive_utc();
query!(
"
INSERT INTO values (timestamp, value, device_id, active_errors, value_id)
VALUES ($1, $2, $3, $4, $5);",
current_timestamp,
msg.value,
device_id,
msg.active_errors,
msg.value_id
)
.execute(&self.conn_pool)
.await?;
Ok(())
}
pub async fn get_values_for_id(
&self,
device_id: &MacAddress,
) -> Result<Vec<ValueMessage>, DatabaseError> {
info!("Getting values for {} from DB", &device_id);
let values = query_as!(
ValueMessage,
"SELECT timestamp, value, value_id
FROM Values
WHERE device_id = $1 ORDER BY timestamp DESC;",
&device_id
)
.fetch_all(&self.conn_pool)
.await?;
Ok(values)
}
pub async fn get_devices(&self) -> Result<Vec<Device>, DatabaseError> {
info!("Getting all devices");
let devices = query_as!(
Device,
"SELECT id, display_name
FROM Devices;",
)
.fetch_all(&self.conn_pool)
.await?;
Ok(devices)
}
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::PgPool;
#[sqlx::test]
async fn add_device_and_display_name(pool: PgPool) {
let db = Database::init_from_pool(pool);
let test_device = Device {
display_name: Some("Waterlevel daheim".to_owned()),
id: MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]),
};
db.add_device(&MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]))
.await
.unwrap();
db.update_display_name(
&MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]),
"Waterlevel daheim",
)
.await
.unwrap();
let devices = db.get_devices().await.unwrap();
assert_eq!(test_device, devices[0]);
}
#[sqlx::test]
async fn add_value(pool: PgPool) {
let db = Database::init_from_pool(pool);
let device_id = MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]);
db.add_device(&device_id).await.unwrap();
let msg = ValueMessageFromDevice {
active_errors: 0,
value: 112.0,
value_id: 1,
};
db.add_value(&msg, &device_id).await.unwrap();
let values = db.get_values_for_id(&device_id).await.unwrap();
assert!((values[0].value - msg.value).abs() < 1e-5);
assert_eq!(values[0].value_id, msg.value_id);
}
}