Implemented Pull OTA json

This commit is contained in:
2025-01-04 22:29:43 +00:00
parent f76f248446
commit 9919495346
11 changed files with 194 additions and 35 deletions

View File

@@ -8,7 +8,7 @@ services:
- ..:/workspace:cached - ..:/workspace:cached
command: sleep infinity command: sleep infinity
ports: ports:
- 1234:8080 - 8282:8282
db: db:
image: postgres image: postgres

1
.gitignore vendored
View File

@@ -19,3 +19,4 @@ Cargo.lock
# Added by cargo # Added by cargo
/target /target
*.bin

View File

@@ -9,9 +9,12 @@ edition = "2021"
actix-web = "4.4.0" actix-web = "4.4.0"
chrono = { version = "0.4.31", features = ["serde"] } chrono = { version = "0.4.31", features = ["serde"] }
dotenvy = "0.15" dotenvy = "0.15"
enum_stringify = "0.6.1"
env_logger = "0.11" env_logger = "0.11"
log = "0.4.20" log = "0.4.20"
serde = "1.0.188" serde = "1.0.188"
sqlx = { version = "0.8", features = [ "runtime-tokio", "tls-rustls", "postgres", "migrate", "chrono", "mac_address"] } sqlx = { version = "0.8", features = [ "runtime-tokio", "tls-rustls", "postgres", "migrate", "chrono", "mac_address"] }
sqlx-cli = "0.8" sqlx-cli = "0.8"
strum = { version = "0.26.3", features = ["derive"] }
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "1", features = ["fs", "rt-multi-thread"] }

View File

@@ -0,0 +1,15 @@
meta {
name: GET firmwares
type: http
seq: 4
}
get {
url: http://localhost:8484/firmware/waterlevel
body: multipartForm
auth: none
}
body:multipart-form {
: @file(/home/tobi/git/iot-cloud-api/target/debug/iot-cloud)
}

View File

@@ -0,0 +1,11 @@
meta {
name: Get Devices
type: http
seq: 2
}
get {
url: http://localhost:8484/device
body: none
auth: none
}

View File

@@ -0,0 +1,15 @@
meta {
name: PUT firmware
type: http
seq: 3
}
put {
url: http://localhost:8484/firmware/waterlevel/INA226/0.0.1
body: multipartForm
auth: none
}
body:multipart-form {
: @file(/home/tobi/git/iot-cloud-api/target/debug/iot-cloud)
}

9
dev/IoT-API/bruno.json Normal file
View File

@@ -0,0 +1,9 @@
{
"version": "1",
"name": "IoT-API",
"type": "collection",
"ignore": [
"node_modules",
".git"
]
}

View File

@@ -1,10 +1,15 @@
use actix_web::web; use actix_web::web;
use chrono::Utc; use chrono::Utc;
use log::{error, info}; use log::{error, info};
use sqlx::{migrate, postgres::PgPoolOptions, query, query_as, types::mac_address::MacAddress, Pool, Postgres}; use sqlx::{
migrate, postgres::PgPoolOptions, query, query_as, types::mac_address::MacAddress, Pool,
Postgres,
};
use thiserror::Error; use thiserror::Error;
use crate::schemas::{TelemetryMessage, TelemetryMessageFromDevice, ValueMessageFromDevice, ValueMessage, Device}; use crate::schemas::{
Device, TelemetryMessage, TelemetryMessageFromDevice, ValueMessage, ValueMessageFromDevice,
};
#[derive(Clone)] #[derive(Clone)]
pub struct Database { pub struct Database {
@@ -59,15 +64,26 @@ impl Database {
Ok(()) Ok(())
} }
pub async fn add_display_name(&self, device_id: &MacAddress, display_name: &str) -> Result<(), DatabaseError> { pub async fn add_display_name(
&self,
device_id: &MacAddress,
display_name: &str,
) -> Result<(), DatabaseError> {
info!("Adding Displayname {display_name} to Device with ID {device_id}"); info!("Adding Displayname {display_name} to Device with ID {device_id}");
query!("UPDATE Devices SET display_name = $1 WHERE id = $2;", display_name, device_id) query!(
.execute(&self.conn_pool) "UPDATE Devices SET display_name = $1 WHERE id = $2;",
.await?; display_name,
device_id
)
.execute(&self.conn_pool)
.await?;
Ok(()) Ok(())
} }
pub async fn create_device_if_not_exists(&self, device_id: &MacAddress) -> Result<(), DatabaseError> { pub async fn create_device_if_not_exists(
&self,
device_id: &MacAddress,
) -> Result<(), DatabaseError> {
info!("Checking if device with the ID {} exists", &device_id); info!("Checking if device with the ID {} exists", &device_id);
let exists_result = query!("SELECT count(*) FROM devices WHERE ID = $1;", device_id) let exists_result = query!("SELECT count(*) FROM devices WHERE ID = $1;", device_id)
.fetch_one(&self.conn_pool) .fetch_one(&self.conn_pool)
@@ -130,15 +146,26 @@ impl Database {
) -> Result<(), DatabaseError> { ) -> Result<(), DatabaseError> {
info!("Adding value to DB"); info!("Adding value to DB");
let current_timestamp = Utc::now().naive_utc(); let current_timestamp = Utc::now().naive_utc();
query!(" query!(
"
INSERT INTO values (timestamp, value, device_id, active_errors, value_id) INSERT INTO values (timestamp, value, device_id, active_errors, value_id)
VALUES ($1, $2, $3, $4, $5);", VALUES ($1, $2, $3, $4, $5);",
current_timestamp, msg.value, device_id, msg.active_errors, msg.value_id).execute(&self.conn_pool).await?; current_timestamp,
msg.value,
device_id,
msg.active_errors,
msg.value_id
)
.execute(&self.conn_pool)
.await?;
Ok(()) Ok(())
} }
pub async fn get_values_for_id(&self, device_id: &MacAddress) -> Result<Vec<ValueMessage>, DatabaseError> { pub async fn get_values_for_id(
&self,
device_id: &MacAddress,
) -> Result<Vec<ValueMessage>, DatabaseError> {
info!("Getting values for {} from DB", &device_id); info!("Getting values for {} from DB", &device_id);
let values = query_as!( let values = query_as!(
ValueMessage, ValueMessage,
@@ -175,18 +202,23 @@ mod tests {
#[sqlx::test] #[sqlx::test]
async fn add_device(pool: PgPool) { async fn add_device(pool: PgPool) {
dotenv().ok();
let db = Database::init_from_pool(pool).await; let db = Database::init_from_pool(pool).await;
let test_device = Device{ let test_device = Device {
display_name: Some("Waterlevel daheim".to_owned()), display_name: Some("Waterlevel daheim".to_owned()),
id: MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]) id: MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]),
}; };
db.add_device(&MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F])).await.unwrap(); db.add_device(&MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]))
db.add_display_name(&MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]), "Waterlevel daheim").await.unwrap(); .await
.unwrap();
db.add_display_name(
&MacAddress::from([0x1A, 0x2B, 0x3C, 0x4D, 0x5E, 0x6F]),
"Waterlevel daheim",
)
.await
.unwrap();
let devices = db.get_devices().await.unwrap(); let devices = db.get_devices().await.unwrap();
assert_eq!(test_device, devices[0]); assert_eq!(test_device, devices[0]);
} }
} }

View File

@@ -1,9 +1,11 @@
use std::{env, process}; use std::{env, fs, process, str::FromStr};
use crate::schemas::{TelemetryMessageFromDevice, ValueMessageFromDevice}; use crate::schemas::{TelemetryMessageFromDevice, ValueMessageFromDevice};
use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder}; use actix_web::{get, post, put, web, App, HttpResponse, HttpServer, Responder};
use database::Database; use database::Database;
use dotenvy::dotenv;
use log::{error, info}; use log::{error, info};
use schemas::{BoardConfig, BoardType, Device, OTAConfiguration};
use sqlx::types::mac_address::MacAddress; use sqlx::types::mac_address::MacAddress;
use util::parse_mac_address; use util::parse_mac_address;
@@ -34,7 +36,11 @@ async fn receive_telemetry(
} }
}; };
match data.db.add_telemetry(&telemetry_message, &mac_converted).await { match data
.db
.add_telemetry(&telemetry_message, &mac_converted)
.await
{
Ok(_) => HttpResponse::Created(), Ok(_) => HttpResponse::Created(),
Err(e) => { Err(e) => {
error!("adding Telemetry message to DB failed \n{}", e); error!("adding Telemetry message to DB failed \n{}", e);
@@ -105,7 +111,7 @@ async fn get_value(device_id: web::Path<String>, data: web::Data<AppState>) -> i
HttpResponse::Ok().json(messages) HttpResponse::Ok().json(messages)
} }
#[get("/device/")] #[get("/device")]
async fn get_devices(data: web::Data<AppState>) -> impl Responder { async fn get_devices(data: web::Data<AppState>) -> impl Responder {
info!("GET - devices - Processing"); info!("GET - devices - Processing");
let devices = match data.db.get_devices().await { let devices = match data.db.get_devices().await {
@@ -118,12 +124,54 @@ async fn get_devices(data: web::Data<AppState>) -> impl Responder {
HttpResponse::Ok().json(devices) HttpResponse::Ok().json(devices)
} }
#[put("/firmware/{product}/{config}/{version}")]
async fn upload_firmware(path: web::Path<(String, String, String)>, body: web::Bytes) -> impl Responder {
let (product, config, version) = path.into_inner();
println!("Uploading firmware version: {}", version);
let fw_folder = format!("./firmware/{product}");
fs::create_dir_all(&fw_folder).unwrap();
let file_path = format!("{fw_folder}/firmware_{config}_{version}.bin");
info!("Saving to {file_path}");
tokio::fs::write(&file_path, &body).await.unwrap();
HttpResponse::Ok().body(format!("Firmware version {} uploaded successfully", version))
}
// TODO this is more or less a placeholder. Firmware upload will be more detailed in the future
#[get("/firmware/{product}")]
async fn get_firmware_json(product: web::Path<String>) -> impl Responder {
let product = product.into_inner();
let mut configs = Vec::new();
if let Ok(entries) = fs::read_dir(format!("./firmware/{product}")) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_file() {
println!("File: {:?}", path);
let split_name: Vec<_> = path.file_name().unwrap().to_str().unwrap().split("_").collect();
let version = split_name[2].strip_suffix(".bin").unwrap();
let board_config = BoardConfig::from_str(split_name[1]).unwrap();
let board_type = BoardType::from_str(&product).unwrap();
let cfg = OTAConfiguration{board: board_type, configuration: board_config, version: version.to_string(), url: path.to_str().to_owned().unwrap().to_owned() };
configs.push(cfg);
} else if path.is_dir() {
println!("Directory: {:?}", path);
}
}
} else {
return HttpResponse::InternalServerError().finish()
}
HttpResponse::Ok().json(configs)
}
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
dotenv().ok();
env_logger::init(); env_logger::init();
info!("Starting"); info!("Starting");
let db_url = match env::var("DATABASE_URL") { let db_url = match env::var("DATABASE_URL") {
Ok(url) => url, Ok(url) => url,
Err(e) => { Err(e) => {
@@ -139,13 +187,16 @@ async fn main() -> std::io::Result<()> {
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.app_data(web::Data::new(AppState { db: db.clone() })) .app_data(web::Data::new(AppState { db: db.clone() }))
.app_data(web::PayloadConfig::new(256 * 1024 * 1024)) //256MB
.service(receive_telemetry) .service(receive_telemetry)
.service(get_telemetry) .service(get_telemetry)
.service(receive_value) .service(receive_value)
.service(get_value) .service(get_value)
.service(get_devices) .service(get_devices)
.service(upload_firmware)
.service(get_firmware_json)
}) })
.bind(("0.0.0.0", 8080))? .bind(("0.0.0.0", 8484))?
.run() .run()
.await .await
} }

View File

@@ -1,6 +1,8 @@
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use enum_stringify::EnumStringify;
use serde::{ser::SerializeStruct, Deserialize, Serialize}; use serde::{ser::SerializeStruct, Deserialize, Serialize};
use sqlx::types::mac_address::MacAddress; use sqlx::types::mac_address::MacAddress;
use strum::EnumString;
#[derive(Deserialize, Debug, Serialize)] #[derive(Deserialize, Debug, Serialize)]
pub struct TelemetryMessage { pub struct TelemetryMessage {
@@ -36,10 +38,9 @@ pub struct ValueMessage {
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub struct Device { pub struct Device {
pub display_name: Option<String>, pub display_name: Option<String>,
pub id: MacAddress pub id: MacAddress,
} }
impl Serialize for Device { impl Serialize for Device {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where where
@@ -50,9 +51,35 @@ impl Serialize for Device {
// Serialize each field with custom logic // Serialize each field with custom logic
let bytes = self.id.bytes(); let bytes = self.id.bytes();
state.serialize_field("display_name", &self.display_name)?; state.serialize_field("display_name", &self.display_name)?;
state.serialize_field("id", &format!("{}{}{}{}{}{}", bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5]))?; state.serialize_field(
"id",
&format!(
"{}{}{}{}{}{}",
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5]
),
)?;
// End the serialization process // End the serialization process
state.end() state.end()
} }
} }
#[derive(serde::Serialize)]
pub struct OTAConfiguration {
pub board: BoardType,
pub configuration: BoardConfig,
pub version: String,
pub url: String
}
#[derive(serde::Serialize, EnumString)]
pub enum BoardType {
Waterlevel
}
#[derive(serde::Serialize, EnumString)]
pub enum BoardConfig {
INA226,
INA233
}

View File

@@ -1,12 +1,8 @@
use thiserror::Error; use thiserror::Error;
pub fn parse_mac_address(mac: &str) -> Result<[u8; 6], MacAddressError> { pub fn parse_mac_address(mac: &str) -> Result<[u8; 6], MacAddressError> {
if mac.len() != 12 { if mac.len() != 12 {
return Err(MacAddressError::Length(mac.len())) return Err(MacAddressError::Length(mac.len()));
} }
let mut mac_bytes = [0u8; 6]; let mut mac_bytes = [0u8; 6];
@@ -68,4 +64,3 @@ mod tests {
assert!(parse_mac_address(mac_str).is_err()); assert!(parse_mac_address(mac_str).is_err());
} }
} }