diff options
author | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-12 06:17:38 +0000 |
---|---|---|
committer | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-12 06:17:38 +0000 |
commit | 78608add1c69a877b76a05147f6c26b7abe66669 (patch) | |
tree | b2930fbf3c8e5f6bf2f5eb00f260b5b5092e09bb | |
parent | bc0fac1bf2a02c9a76c8c4b9eccc9235de4e86b2 (diff) | |
download | salaryman-78608add1c69a877b76a05147f6c26b7abe66669.tar.gz salaryman-78608add1c69a877b76a05147f6c26b7abe66669.tar.bz2 salaryman-78608add1c69a877b76a05147f6c26b7abe66669.zip |
add start, stop, restart endpoints
git-svn-id: svn+ssh://diminuette.aengel.lesbianunix.dev/salaryman/trunk@14 b9215c17-b818-4693-b096-d1e41a411fef
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/service.rs | 21 | ||||
-rw-r--r-- | src/smd/context.rs | 38 | ||||
-rw-r--r-- | src/smd/endpoints.rs | 202 | ||||
-rw-r--r-- | src/smd/main.rs | 60 |
6 files changed, 285 insertions, 39 deletions
diff --git a/Cargo.lock b/Cargo.lock index 498c7bd..9752728 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1240,6 +1240,7 @@ dependencies = [ "clap", "dropshot", "schemars", + "semver", "serde", "tokio", "toml", diff --git a/Cargo.toml b/Cargo.toml index 18432a7..466871a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,12 +12,14 @@ smd = [ "dep:dropshot", "dep:schemars", "dep:toml", + "dep:semver", ] [dependencies] clap = { version = "4.5.39", features = ["derive"], optional = true } dropshot = { version = "0.16.2", features = ["usdt","usdt-probes"], optional = true } schemars = { version = "0.8.22", features = ["uuid1"], optional = true } +semver = { version = "1.0.26", optional = true } serde = { version = "1.0.219", features = ["derive"] } tokio = { version = "1.45.1", features = ["full"] } toml = { version = "0.8.22", optional = true } diff --git a/src/service.rs b/src/service.rs index dee6e59..22cf9c4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -149,6 +149,17 @@ impl Service { Ok(()) } /** + * Calls self.start(), then self.scan_stdout(), and finally self.scan_stderr() + */ + #[inline] + pub async fn start_with_output(&mut self) -> Result<(), Box<dyn std::error::Error>> { + self.start().await?; + self.scan_stdout().await?; + self.scan_stderr().await?; + Ok(()) + } + //TODO: process monitoring! + /** * Returns true when process is started and false when process is stopped. */ pub async fn started(&self) -> bool { @@ -176,12 +187,22 @@ impl Service { /** * Restarts service process */ + #[inline] pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> { self.stop().await?; self.start().await?; Ok(()) } /** + * Restarts service process + */ + #[inline] + pub async fn restart_with_output(&mut self) -> Result<(), Box<dyn std::error::Error>> { + self.stop().await?; + self.start_with_output().await?; + Ok(()) + } + /** * Takes control of service process' stdout file handle and spawns a new task to continuously * scan it. */ diff --git a/src/smd/context.rs b/src/smd/context.rs index d8194c5..5d8038c 100644 --- a/src/smd/context.rs +++ b/src/smd/context.rs @@ -1,27 +1,47 @@ -use super::Config; -use salaryman::service::Service; +use salaryman::service::{Service, ServiceConf}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::Mutex; +use uuid::Uuid; + +pub struct SalarymanService { + pub config: ServiceConf, + pub service: Arc<Mutex<Service>>, +} +impl SalarymanService { + pub fn new() -> Self { + Self { + config: ServiceConf::new(), + service: Arc::new(Mutex::new(Service::new())), + } + } + pub fn from_parts(config: ServiceConf, service: Arc<Mutex<Service>>) -> Self { + Self { config, service } + } +} pub struct SalarymanDContext { - pub config: Config, - pub service: Vec<Arc<Mutex<Service>>>, + pub services: Vec<Arc<SalarymanService>>, } impl SalarymanDContext { pub fn new() -> Self { Self { - config: Config::new(), - service: Vec::new(), + services: Vec::new(), } } - pub fn from_parts(config: Config, service: Vec<Arc<Mutex<Service>>>) -> Self { - Self { config, service } + pub fn from_vec(services: Vec<Arc<SalarymanService>>) -> Self { + Self { services } } } #[derive(Serialize, Deserialize, JsonSchema, Debug)] pub struct StdinBuffer { - pub string: String, + pub stdin: String, + pub endl: Option<bool>, +} + +#[derive(Serialize, Deserialize, JsonSchema, Debug)] +pub struct ServicePath { + pub service_uuid: Uuid, } diff --git a/src/smd/endpoints.rs b/src/smd/endpoints.rs index 48c9720..f0ba0ea 100644 --- a/src/smd/endpoints.rs +++ b/src/smd/endpoints.rs @@ -1,37 +1,205 @@ -use super::{ - Config, - context::{SalarymanDContext, StdinBuffer}, -}; -use dropshot::{HttpError, HttpResponseOk, RequestContext, TypedBody, endpoint}; +use super::context::{SalarymanDContext, SalarymanService, ServicePath, StdinBuffer}; +use dropshot::{HttpError, HttpResponseOk, Path, RequestContext, TypedBody, endpoint}; +use salaryman::service::ServiceConf; use std::sync::Arc; #[endpoint { method = GET, - path = "/config", + path = "/service", }] -pub async fn endpoint_get_config( +pub async fn endpoint_get_services( rqctx: RequestContext<Arc<SalarymanDContext>>, -) -> Result<HttpResponseOk<Config>, HttpError> { - Ok(HttpResponseOk(rqctx.context().config.clone())) +) -> Result<HttpResponseOk<Vec<ServiceConf>>, HttpError> { + let ret = { + let ctx = rqctx.context(); + let mut v: Vec<ServiceConf> = Vec::new(); + for i in 0..ctx.services.len() { + v.push(ctx.services[i].config.clone()); + } + v + }; + Ok(HttpResponseOk(ret)) +} +#[endpoint { + method = GET, + path = "/service/{service_uuid}", +}] +pub async fn endpoint_get_service( + rqctx: RequestContext<Arc<SalarymanDContext>>, + path_params: Path<ServicePath>, +) -> Result<HttpResponseOk<ServiceConf>, HttpError> { + let u = path_params.into_inner().service_uuid; + let ctx = rqctx.context(); + let mut service: Option<Arc<SalarymanService>> = None; + for i in 0..ctx.services.len() { + if ctx.services[i].config.uuid == u { + service = Some(ctx.services[i].clone()); + } else { + continue; + } + } + let s = match service { + Some(s) => s.config.clone(), + None => { + return Err(HttpError::for_unavail( + None, + String::from("Service Not Found"), + )); + } + }; + Ok(HttpResponseOk(s)) +} +#[endpoint { + method = GET, + path = "/service/{service_uuid}/start", +}] +pub async fn endpoint_start_service( + rqctx: RequestContext<Arc<SalarymanDContext>>, + path_params: Path<ServicePath>, +) -> Result<HttpResponseOk<()>, HttpError> { + let u = path_params.into_inner().service_uuid; + let ctx = rqctx.context(); + let mut service: Option<Arc<SalarymanService>> = None; + for i in 0..ctx.services.len() { + if ctx.services[i].config.uuid == u { + service = Some(ctx.services[i].clone()); + } else { + continue; + } + } + match service { + Some(s) => { + let mut lock = s.service.lock().await; + match lock.start_with_output().await { + Ok(_) => (), + Err(e) => return Err(HttpError::for_internal_error(e.to_string())), + } + } + None => { + return Err(HttpError::for_unavail( + None, + String::from("Service Not Found"), + )); + } + }; + Ok(HttpResponseOk(())) +} +#[endpoint { + method = GET, + path = "/service/{service_uuid}/stop", +}] +pub async fn endpoint_stop_service( + rqctx: RequestContext<Arc<SalarymanDContext>>, + path_params: Path<ServicePath>, +) -> Result<HttpResponseOk<()>, HttpError> { + let u = path_params.into_inner().service_uuid; + let ctx = rqctx.context(); + let mut service: Option<Arc<SalarymanService>> = None; + for i in 0..ctx.services.len() { + if ctx.services[i].config.uuid == u { + service = Some(ctx.services[i].clone()); + } else { + continue; + } + } + match service { + Some(s) => { + let mut lock = s.service.lock().await; + match lock.stop().await { + Ok(_) => (), + Err(e) => return Err(HttpError::for_internal_error(e.to_string())), + } + } + None => { + return Err(HttpError::for_unavail( + None, + String::from("Service Not Found"), + )); + } + }; + Ok(HttpResponseOk(())) +} +#[endpoint { + method = GET, + path = "/service/{service_uuid}/restart", +}] +pub async fn endpoint_restart_service( + rqctx: RequestContext<Arc<SalarymanDContext>>, + path_params: Path<ServicePath>, +) -> Result<HttpResponseOk<()>, HttpError> { + let u = path_params.into_inner().service_uuid; + let ctx = rqctx.context(); + let mut service: Option<Arc<SalarymanService>> = None; + for i in 0..ctx.services.len() { + if ctx.services[i].config.uuid == u { + service = Some(ctx.services[i].clone()); + } else { + continue; + } + } + match service { + Some(s) => { + let mut lock = s.service.lock().await; + match lock.restart_with_output().await { + Ok(_) => (), + Err(e) => return Err(HttpError::for_internal_error(e.to_string())), + } + } + None => { + return Err(HttpError::for_unavail( + None, + String::from("Service Not Found"), + )); + } + }; + Ok(HttpResponseOk(())) } - #[endpoint { method = PUT, - path = "/services/write" + path = "/service/{service_uuid}/write" }] pub async fn endpoint_post_stdin( rqctx: RequestContext<Arc<SalarymanDContext>>, + path_params: Path<ServicePath>, update: TypedBody<StdinBuffer>, ) -> Result<HttpResponseOk<()>, HttpError> { let ctx = rqctx.context(); let stdin_str = update.into_inner(); - for i in 0..ctx.service.len() { - let mut lock = ctx.service[i].lock().await; - if lock.started().await { - lock.writeln_stdin(stdin_str.string.clone()).await.unwrap(); //TODO: PROPERLY HANDLE ERROR! + let u = path_params.into_inner().service_uuid; + let mut service: Option<Arc<SalarymanService>> = None; + for i in 0..ctx.services.len() { + if ctx.services[i].config.uuid == u { + service = Some(ctx.services[i].clone()); + } else { + continue; + } + } + match service { + Some(s) => { + let mut lock = s.service.lock().await; + if lock.started().await { + let b = if let Some(endl) = stdin_str.endl { + if endl { + lock.writeln_stdin(stdin_str.stdin.clone()).await //TODO: PROPERLY HANDLE ERROR! + } else { + lock.write_stdin(stdin_str.stdin.clone()).await //TODO: PROPERLY HANDLE ERROR! + } + } else { + lock.writeln_stdin(stdin_str.stdin.clone()).await //TODO: PROPERLY HANDLE ERROR! + }; + match b { + Ok(_) => (), + Err(e) => return Err(HttpError::for_internal_error(e.to_string())), + } + } + drop(lock); + } + None => { + return Err(HttpError::for_unavail( + None, + String::from("Service Not Found"), + )); } - drop(lock); } - Ok(HttpResponseOk(())) } diff --git a/src/smd/main.rs b/src/smd/main.rs index 628785a..b81df2f 100644 --- a/src/smd/main.rs +++ b/src/smd/main.rs @@ -2,16 +2,23 @@ mod context; mod endpoints; use clap::Parser; -use dropshot::{ApiDescription, ConfigLogging, ConfigLoggingLevel, ServerBuilder}; +use dropshot::{ApiDescription, ConfigDropshot, ConfigLogging, ConfigLoggingLevel, ServerBuilder}; use salaryman::service::{Service, ServiceConf}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use tokio::{fs::read_to_string, sync::Mutex}; -use std::{net::IpAddr, path::PathBuf, sync::Arc}; +use std::{ + net::{IpAddr, SocketAddr}, + path::PathBuf, + sync::Arc, +}; -use crate::context::SalarymanDContext; -use crate::endpoints::{endpoint_get_config, endpoint_post_stdin}; +use crate::context::{SalarymanDContext, SalarymanService}; +use crate::endpoints::{ + endpoint_get_service, endpoint_get_services, endpoint_post_stdin, endpoint_restart_service, + endpoint_start_service, endpoint_stop_service, +}; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -89,26 +96,53 @@ async fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error async fn main() -> Result<(), Box<dyn std::error::Error>> { let args = Args::parse(); let conf: Config = load_config(&args.config).await?; - let mut services: Vec<Arc<Mutex<Service>>> = Vec::new(); + let addr = if let Some(addr) = conf.address { + addr + } else { + args.address + }; + let port = if let Some(port) = conf.port { + port + } else { + args.port + }; + let bind = SocketAddr::new(addr, port); + let mut services: Vec<Arc<SalarymanService>> = Vec::new(); for i in 0..conf.service.len() { - services.push(Arc::new(Mutex::new(Service::from_conf(&conf.service[i])))); - if conf.service[i].autostart { - let mut lock = services[i].lock().await; + services.push(Arc::new(SalarymanService::from_parts( + conf.service[i].clone(), + Arc::new(Mutex::new(Service::from_conf(&conf.service[i]))), + ))); + } + for i in 0..services.len() { + if services[i].config.autostart { + let mut lock = services[i].service.lock().await; lock.start().await?; lock.scan_stdout().await?; lock.scan_stderr().await?; - drop(lock); } } let log_conf = ConfigLogging::StderrTerminal { level: ConfigLoggingLevel::Info, }; let log = log_conf.to_logger("smd")?; - let ctx = Arc::new(SalarymanDContext::from_parts(conf, services)); + let ctx = Arc::new(SalarymanDContext::from_vec(services)); + let config = ConfigDropshot { + bind_address: bind, + ..Default::default() + }; let mut api = ApiDescription::new(); - api.register(endpoint_get_config).unwrap(); - api.register(endpoint_post_stdin).unwrap(); - let server = ServerBuilder::new(api, ctx.clone(), log).start()?; + api.register(endpoint_get_services)?; + api.register(endpoint_get_service)?; + api.register(endpoint_start_service)?; + api.register(endpoint_stop_service)?; + api.register(endpoint_restart_service)?; + api.register(endpoint_post_stdin)?; + api.openapi("Salaryman", semver::Version::new(1, 0, 0)) + .write(&mut std::io::stdout())?; + let server = ServerBuilder::new(api, ctx.clone(), log) + .config(config) + .start()?; server.await?; Ok(()) } |