Files
iot-cloud-api/src/database.rs
2023-09-18 20:40:20 +00:00

103 lines
3.5 KiB
Rust

use actix_web::{cookie::time::error, web};
use log::{error, info};
use sqlx::{pool, postgres::PgPoolOptions, query, PgPool, Pool, Postgres, migrate, query_as};
use thiserror::Error;
use chrono::Utc;
use crate::schemas::{TelemetryMessage, TelemetryMessageFromDevice};
#[derive(Clone)]
pub struct Database {
conn_pool: Pool<Postgres>,
}
#[derive(Error, Debug)]
pub enum DatabaseError {
#[error("Generic SQLX Error")]
SqlxError(#[from] sqlx::Error)
}
impl Database {
pub async fn init(host: &str, user: &str, pass: &str, db_name: &str) -> Database {
match PgPoolOptions::new()
.max_connections(10)
.connect(&format!("postgres://{user}:{pass}@{host}/{db_name}"))
.await
{
Ok(pool) => {
info!("Connection to the database is successful!");
Database { conn_pool: pool }
}
Err(err) => {
error!("Failed to connect to the database: {:?}", err);
std::process::exit(1);
}
}
}
// Check if the necessary tables exist. If not, create them. TODO auto-migration
pub async fn init_db(&self) {
info!("Checking if required tables exist");
match migrate!().run(&self.conn_pool).await{
Ok(_) => {},
Err(e) => {
error!("Error when running migrations {}", e);
std::process::exit(1);
}
};
}
pub async fn add_device(&self, device_id: &str) -> Result<(), DatabaseError> {
info!("Adding device with the ID {}", &device_id);
query!("INSERT INTO Devices (ID) VALUES ($1);", device_id).execute(&self.conn_pool).await?;
Ok(())
}
pub async fn create_device_if_not_exists(&self, device_id: &str) -> Result<(), DatabaseError> {
info!("Checking if device with the ID {} exists", &device_id);
let exists_result =
query!("SELECT count(*) FROM devices WHERE ID = $1;", device_id)
.fetch_one(&self.conn_pool)
.await;
let exists = match exists_result {
Ok(res) => res.count > Some(0),
Err(err) => {
error!("Error checking table existence: {:?}", err);
std::process::exit(1);
}
};
if exists{
info!("Device exists");
Ok(())
} else {
info!("Device does not exist");
self.add_device(device_id).await?;
Ok(())
}
}
pub async fn add_telemetry(&self, msg: &web::Json<TelemetryMessageFromDevice>, device_id: &str) -> Result<(), DatabaseError> {
info!("Adding telemetry message to DB");
let current_timestamp = Utc::now().naive_utc();
query!("INSERT INTO Telemetry (timestamp, software_version, voltage, temperature, uptime, device_id)
VALUES ($1, $2, $3, $4, $5, $6);",
current_timestamp, msg.software_version, msg.voltage, msg.temperature, msg.uptime, device_id
).execute(&self.conn_pool).await?;
Ok(())
}
pub async fn get_telemetry_for_id(&self, device_id: &str) -> Result<Vec<TelemetryMessage>, DatabaseError> {
info!("Getting telemetry messages for {} from DB", &device_id);
let messages = query_as!(TelemetryMessage,
"SELECT timestamp, software_version, voltage, temperature, uptime
FROM Telemetry
WHERE device_id = $1 ORDER BY timestamp DESC;", &device_id)
.fetch_all(&self.conn_pool).await?;
Ok(messages)
}
}