Finished values and telemetry

This commit is contained in:
2023-09-19 19:58:48 +00:00
parent cd0581973d
commit 3b8926fb61
5 changed files with 134 additions and 37 deletions

View File

@@ -1,10 +1,10 @@
use actix_web::{cookie::time::error, web};
use log::{error, info};
use sqlx::{pool, postgres::PgPoolOptions, query, PgPool, Pool, Postgres, migrate, query_as};
use thiserror::Error;
use chrono::Utc;
use log::{error, info};
use sqlx::{migrate, pool, postgres::PgPoolOptions, query, query_as, PgPool, Pool, Postgres};
use thiserror::Error;
use crate::schemas::{TelemetryMessage, TelemetryMessageFromDevice};
use crate::schemas::{TelemetryMessage, TelemetryMessageFromDevice, ValueMessageFromDevice, ValueMessage};
#[derive(Clone)]
pub struct Database {
@@ -14,7 +14,7 @@ pub struct Database {
#[derive(Error, Debug)]
pub enum DatabaseError {
#[error("Generic SQLX Error")]
SqlxError(#[from] sqlx::Error)
SqlxError(#[from] sqlx::Error),
}
impl Database {
@@ -38,8 +38,8 @@ impl Database {
// Check if the necessary tables exist. If not, create them. TODO auto-migration
pub async fn init_db(&self) {
info!("Checking if required tables exist");
match migrate!().run(&self.conn_pool).await{
Ok(_) => {},
match migrate!().run(&self.conn_pool).await {
Ok(_) => {}
Err(e) => {
error!("Error when running migrations {}", e);
std::process::exit(1);
@@ -49,17 +49,18 @@ impl Database {
pub async fn add_device(&self, device_id: &str) -> Result<(), DatabaseError> {
info!("Adding device with the ID {}", &device_id);
query!("INSERT INTO Devices (ID) VALUES ($1);", device_id).execute(&self.conn_pool).await?;
query!("INSERT INTO Devices (ID) VALUES ($1);", device_id)
.execute(&self.conn_pool)
.await?;
Ok(())
}
pub async fn create_device_if_not_exists(&self, device_id: &str) -> Result<(), DatabaseError> {
info!("Checking if device with the ID {} exists", &device_id);
let exists_result =
query!("SELECT count(*) FROM devices WHERE ID = $1;", device_id)
.fetch_one(&self.conn_pool)
.await;
let exists_result = query!("SELECT count(*) FROM devices WHERE ID = $1;", device_id)
.fetch_one(&self.conn_pool)
.await;
let exists = match exists_result {
Ok(res) => res.count > Some(0),
Err(err) => {
@@ -68,7 +69,7 @@ impl Database {
}
};
if exists{
if exists {
info!("Device exists");
Ok(())
} else {
@@ -78,7 +79,11 @@ impl Database {
}
}
pub async fn add_telemetry(&self, msg: &web::Json<TelemetryMessageFromDevice>, device_id: &str) -> Result<(), DatabaseError> {
pub async fn add_telemetry(
&self,
msg: &web::Json<TelemetryMessageFromDevice>,
device_id: &str,
) -> Result<(), DatabaseError> {
info!("Adding telemetry message to DB");
let current_timestamp = Utc::now().naive_utc();
query!("INSERT INTO Telemetry (timestamp, software_version, voltage, temperature, uptime, device_id)
@@ -88,15 +93,51 @@ impl Database {
Ok(())
}
pub async fn get_telemetry_for_id(&self, device_id: &str) -> Result<Vec<TelemetryMessage>, DatabaseError> {
pub async fn get_telemetry_for_id(
&self,
device_id: &str,
) -> Result<Vec<TelemetryMessage>, DatabaseError> {
info!("Getting telemetry messages for {} from DB", &device_id);
let messages = query_as!(TelemetryMessage,
let messages = query_as!(
TelemetryMessage,
"SELECT timestamp, software_version, voltage, temperature, uptime
FROM Telemetry
WHERE device_id = $1 ORDER BY timestamp DESC;", &device_id)
.fetch_all(&self.conn_pool).await?;
WHERE device_id = $1 ORDER BY timestamp DESC;",
&device_id
)
.fetch_all(&self.conn_pool)
.await?;
Ok(messages)
}
pub async fn add_value(
&self,
msg: &web::Json<ValueMessageFromDevice>,
device_id: &str,
) -> Result<(), DatabaseError> {
info!("Adding value to DB");
let current_timestamp = Utc::now().naive_utc();
query!("
INSERT INTO values (timestamp, value, device_id, active_errors, value_id)
VALUES ($1, $2, $3, $4, $5);",
current_timestamp, msg.value, device_id, msg.active_errors, msg.value_id).execute(&self.conn_pool).await?;
Ok(())
}
pub async fn get_values_for_id(&self, device_id: &str) -> Result<Vec<ValueMessage>, DatabaseError> {
info!("Getting values for {} from DB", &device_id);
let values = query_as!(
ValueMessage,
"SELECT timestamp, value, value_id
FROM Values
WHERE device_id = $1 ORDER BY timestamp DESC;",
&device_id
)
.fetch_all(&self.conn_pool)
.await?;
Ok(values)
}
}