Files
iot-cloud-api/src/main.rs
Tobias Maier 92159bc276
All checks were successful
Build Project / test (push) Successful in 5m38s
Format
2025-01-19 16:14:22 +00:00

248 lines
8.2 KiB
Rust

use std::{env, fs, path::PathBuf, process};
use crate::schemas::{TelemetryMessageFromDevice, ValueMessageFromDevice};
use actix_web::{get, post, put, web, App, HttpResponse, HttpServer, Responder};
use database::Database;
use dotenvy::dotenv;
use log::{debug, error, info, warn};
use schemas::OTAConfigurationList;
use sqlx::types::mac_address::MacAddress;
use util::{get_files, parse_mac_address};
mod database;
mod schemas;
mod util;
struct AppState {
db: Database,
firmwares_path: PathBuf,
hostname: String,
}
#[post("/telemetry/{device_id}")]
async fn receive_telemetry(
device_id: web::Path<String>,
data: web::Data<AppState>,
telemetry_message: web::Json<TelemetryMessageFromDevice>,
) -> impl Responder {
info!("POST - telementry - Processing device id {}", device_id);
let Ok(mac_converted) = parse_mac_address(&device_id) else {
return HttpResponse::InternalServerError();
};
let mac_converted = MacAddress::from(mac_converted);
match data.db.create_device_if_not_exists(&mac_converted).await {
Ok(_) => {}
Err(e) => {
error!("Error creating new device: {}", e);
return HttpResponse::InternalServerError();
}
};
match data
.db
.add_telemetry(&telemetry_message, &mac_converted)
.await
{
Ok(_) => HttpResponse::Created(),
Err(e) => {
error!("adding Telemetry message to DB failed \n{}", e);
HttpResponse::InternalServerError()
}
}
}
#[get("/telemetry/{device_id}")]
async fn get_telemetry(device_id: web::Path<String>, data: web::Data<AppState>) -> impl Responder {
info!("GET - telementry - Processing device id {}", device_id);
let Ok(mac_converted) = parse_mac_address(&device_id) else {
return HttpResponse::InternalServerError().finish();
};
let mac_converted = MacAddress::from(mac_converted);
let messages = match data.db.get_telemetry_for_id(&mac_converted).await {
Ok(msgs) => msgs,
Err(e) => {
error!("Getting Telemetry Messages from DB failed \n{}", e);
return HttpResponse::InternalServerError().finish();
}
};
HttpResponse::Ok().json(messages)
}
#[post("/value/{device_id}")]
async fn receive_value(
device_id: web::Path<String>,
data: web::Data<AppState>,
value_message: web::Json<ValueMessageFromDevice>,
) -> impl Responder {
info!("POST - value - Processing device id {}", device_id);
let Ok(mac_converted) = parse_mac_address(&device_id) else {
return HttpResponse::InternalServerError();
};
let mac_converted = MacAddress::from(mac_converted);
match data.db.create_device_if_not_exists(&mac_converted).await {
Ok(_) => {}
Err(e) => {
error!("Error creating new device: {}", e);
return HttpResponse::InternalServerError();
}
};
match data.db.add_value(&value_message, &mac_converted).await {
Ok(_) => HttpResponse::Created(),
Err(e) => {
error!("adding Telemetry message to DB failed \n{}", e);
HttpResponse::InternalServerError()
}
}
}
#[get("/value/{device_id}")]
async fn get_value(device_id: web::Path<String>, data: web::Data<AppState>) -> impl Responder {
info!("GET - value - Processing device id {}", device_id);
let Ok(mac_converted) = parse_mac_address(&device_id) else {
return HttpResponse::InternalServerError().finish();
};
let mac_converted = MacAddress::from(mac_converted);
let messages = match data.db.get_values_for_id(&mac_converted).await {
Ok(msgs) => msgs,
Err(e) => {
error!("Getting Values from DB failed \n{}", e);
return HttpResponse::InternalServerError().finish();
}
};
HttpResponse::Ok().json(messages)
}
#[get("/device")]
async fn get_devices(data: web::Data<AppState>) -> impl Responder {
info!("GET - devices - Processing");
let devices = match data.db.get_devices().await {
Ok(devs) => devs,
Err(e) => {
error!("Getting Devices from DB failed \n{}", e);
return HttpResponse::InternalServerError().finish();
}
};
HttpResponse::Ok().json(devices)
}
// Upload Firmware file
#[put("/firmware/{device}/{config}/{version}")]
async fn upload_firmware(
data: web::Data<AppState>,
path: web::Path<(String, String, String)>,
body: web::Bytes,
) -> impl Responder {
let (device, config, version) = path.into_inner();
let version = version.replace(".", "-");
let firmware_root_path = &data.firmwares_path;
let firmware_folder = firmware_root_path.join(&device);
let firmware_path = firmware_folder
.join(format!("firmware_{config}_{version}"))
.with_extension("bin");
if firmware_path.is_file() {
warn!("Firmware with product: {device}, config: {config} and version: {version} at path {firmware_path:?} already exists, cant upload");
return HttpResponse::Conflict().body(format!("{firmware_path:?}"));
}
info!("Uploading firmware with product: {device}, config: {config} and version: {version} to {firmware_path:?}");
fs::create_dir_all(&firmware_folder).unwrap();
let x = tokio::fs::write(&firmware_path, &body).await;
debug!("{x:?}");
HttpResponse::Ok().body(format!(
"Firmware version {} uploaded successfully",
version
))
}
#[get("/firmware/{device}")]
async fn get_firmware_json(data: web::Data<AppState>, path: web::Path<String>) -> impl Responder {
let device = path.into_inner();
let fw_path = &data.firmwares_path.join(device);
if fw_path.is_dir() {
match get_files(fw_path, &data.hostname) {
Ok(cfg) => HttpResponse::Ok().json(OTAConfigurationList {
configurations: cfg,
}),
Err(e) => HttpResponse::InternalServerError().body(e.to_string()),
}
} else {
HttpResponse::Ok().json(OTAConfigurationList {
configurations: vec![],
})
}
}
#[get("/firmware/{product}/{config}/{version}.bin")]
async fn serve_firmware(
path: web::Path<(String, String, String)>,
data: web::Data<AppState>,
) -> impl Responder {
let (product, config, version) = path.into_inner();
let version = version.replace(".", "-").replace("_", "-");
let fw_root_path = &data.firmwares_path;
let file_path = PathBuf::from(format!("{product}/firmware_{config}_{version}.bin"));
let file_path = fw_root_path.join(&file_path);
info!("Requested firmware for product: {product}, config: {config} and version: {version}, expected to be stored at {file_path:?}");
if file_path.exists() {
info!("File exists, serving download now");
HttpResponse::Ok()
.content_type("application/octet-stream")
.insert_header((
"Content-Disposition",
format!("attachment; filename=\"{product}_{config}_{version}.bin\""),
))
.body(std::fs::read(file_path).unwrap_or_else(|_| Vec::new()))
} else {
warn!("File does not exist");
HttpResponse::NotFound().body("Firmware version not found")
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
dotenv().ok();
env_logger::init();
info!("Starting");
let db_url = match env::var("DATABASE_URL") {
Ok(url) => url,
Err(e) => {
error!("Failed reading DATABASE_URL: {e}");
process::exit(1);
}
};
info!("Connecting to Database {}", db_url);
let db = Database::init(&db_url).await;
db.init_db().await;
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(AppState {
db: db.clone(),
firmwares_path: PathBuf::from("./fw"),
hostname: "127.0.0.1:8282".to_string(),
}))
.app_data(web::PayloadConfig::new(256 * 1024 * 1024)) //256MB
.service(receive_telemetry)
.service(get_telemetry)
.service(receive_value)
.service(get_value)
.service(get_devices)
.service(upload_firmware)
.service(get_firmware_json)
.service(serve_firmware)
})
.bind(("0.0.0.0", 8282))?
.run()
.await
}