diff options
author | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-29 09:26:15 +0000 |
---|---|---|
committer | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-29 09:26:15 +0000 |
commit | 40a65f981a664a186a52fc2981b99a6d8a1191d4 (patch) | |
tree | cc448f8521d3caa1f1d6396ad67e9b09bf648b05 /src/server | |
parent | e33f9a59f875edf1240ca80c1014235296ff3cbf (diff) | |
download | salaryman-40a65f981a664a186a52fc2981b99a6d8a1191d4.tar.gz salaryman-40a65f981a664a186a52fc2981b99a6d8a1191d4.tar.bz2 salaryman-40a65f981a664a186a52fc2981b99a6d8a1191d4.zip |
parallel process monitoring get
git-svn-id: svn+ssh://diminuette.aengel.lesbianunix.dev/salaryman/trunk@16 b9215c17-b818-4693-b096-d1e41a411fef
Diffstat (limited to 'src/server')
-rw-r--r-- | src/server/context.rs | 47 | ||||
-rw-r--r-- | src/server/endpoints.rs | 359 | ||||
-rw-r--r-- | src/server/main.rs | 137 |
3 files changed, 34 insertions, 509 deletions
diff --git a/src/server/context.rs b/src/server/context.rs deleted file mode 100644 index 132e2dc..0000000 --- a/src/server/context.rs +++ /dev/null @@ -1,47 +0,0 @@ -use super::Config; -use salaryman::service::{Service, ServiceConf}; -use std::path::PathBuf; -use std::sync::Arc; -use tokio::sync::RwLock; - -pub struct SalarymanService { - pub config: ServiceConf, - pub service: Arc<RwLock<Service>>, -} -impl SalarymanService { - pub fn new() -> Self { - Self { - config: ServiceConf::new(), - service: Arc::new(RwLock::new(Service::new())), - } - } - pub fn from_parts(config: ServiceConf, service: Arc<RwLock<Service>>) -> Self { - Self { config, service } - } -} - -pub struct SalarymanDContext { - pub services: RwLock<Vec<Arc<SalarymanService>>>, - pub save_file: PathBuf, - pub config: Arc<RwLock<Config>>, -} -impl SalarymanDContext { - pub fn new() -> Self { - Self { - services: RwLock::new(Vec::new()), - save_file: PathBuf::from(""), - config: Arc::new(RwLock::new(Config::new())), - } - } - pub fn from_parts( - services: RwLock<Vec<Arc<SalarymanService>>>, - save_file: PathBuf, - config: Arc<RwLock<Config>>, - ) -> Self { - Self { - services, - save_file, - config, - } - } -} diff --git a/src/server/endpoints.rs b/src/server/endpoints.rs deleted file mode 100644 index 8ebc42e..0000000 --- a/src/server/endpoints.rs +++ /dev/null @@ -1,359 +0,0 @@ -use super::Config; -use super::context::{SalarymanDContext, SalarymanService}; -use dropshot::{HttpError, HttpResponseOk, Path, RequestContext, TypedBody, endpoint}; -use salaryman::model::{NewService, ServicePath, StdinBuffer, UpdateConf}; -use salaryman::service::{Service, ServiceConf}; -use std::sync::Arc; -use tokio::fs::File; -use tokio::io::AsyncWriteExt; -use tokio::sync::RwLock; - -#[endpoint { - method = GET, - path = "/config", -}] -pub async fn endpoint_get_config( - rqctx: RequestContext<Arc<SalarymanDContext>>, -) -> Result<HttpResponseOk<Config>, HttpError> { - let ctx = rqctx.context(); - let lock = ctx.config.read().await; - let conf = lock.clone(); - drop(lock); - Ok(HttpResponseOk(conf)) -} -#[endpoint { - method = PUT, - path = "/config", -}] -pub async fn endpoint_put_config( - rqctx: RequestContext<Arc<SalarymanDContext>>, - body: TypedBody<UpdateConf>, -) -> Result<HttpResponseOk<()>, HttpError> { - let ctx = rqctx.context(); - let update = body.into_inner(); - if let Some(addr) = update.address { - let mut lock = ctx.config.write().await; - lock.address = Some(addr); - drop(lock); - } - if let Some(port) = update.port { - let mut lock = ctx.config.write().await; - lock.port = Some(port); - drop(lock); - } - Ok(HttpResponseOk(())) -} -#[endpoint { - method = GET, - path = "/config/save" -}] -pub async fn endpoint_get_config_save( - rqctx: RequestContext<Arc<SalarymanDContext>>, -) -> Result<HttpResponseOk<()>, HttpError> { - let ctx = rqctx.context(); - let mut v: Vec<ServiceConf> = Vec::new(); - let rlock = ctx.services.read().await; - for i in 0..rlock.len() { - v.push(rlock[i].config.clone()); - } - drop(rlock); - let rlock = ctx.config.read().await; - let save = Config { - address: rlock.address, - port: rlock.port, - user: rlock.user.clone(), - service: v, - }; - drop(rlock); - let mut f = match File::open(&ctx.save_file).await { - Ok(f) => f, - Err(_) => { - return Err(HttpError::for_internal_error(String::from( - "cannot open desired file", - ))); - } - }; - let save = match toml::to_string(&save) { - Ok(s) => s, - Err(_) => { - return Err(HttpError::for_internal_error(String::from( - "cannot serialize config!", - ))); - } - }; - match f.write_all(save.as_str().as_bytes()).await { - Ok(_) => (), - Err(_) => { - return Err(HttpError::for_internal_error(String::from( - "could not write to file!", - ))); - } - } - Ok(HttpResponseOk(())) -} -#[endpoint { - method = GET, - path = "/service", -}] -pub async fn endpoint_get_services( - rqctx: RequestContext<Arc<SalarymanDContext>>, -) -> Result<HttpResponseOk<Vec<ServiceConf>>, HttpError> { - let ret = { - let ctx = rqctx.context(); - let mut v: Vec<ServiceConf> = Vec::new(); - let lock = ctx.services.read().await; - for i in 0..lock.len() { - v.push(lock[i].config.clone()); - } - v - }; - Ok(HttpResponseOk(ret)) -} -#[endpoint { - method = POST, - path = "/service", -}] -pub async fn endpoint_post_service( - rqctx: RequestContext<Arc<SalarymanDContext>>, - body: TypedBody<NewService>, -) -> Result<HttpResponseOk<ServiceConf>, HttpError> { - let ctx = rqctx.context(); - let body = body.into_inner(); - let mut s: ServiceConf = ServiceConf::new(); - if let Some(name) = &body.name { - s.name = name.clone().to_owned(); - } else { - return Err(HttpError::for_bad_request( - None, - String::from("name field is required!"), - )); - } - if let Some(command) = &body.command { - s.command = command.clone().to_owned(); - } else { - return Err(HttpError::for_bad_request( - None, - String::from("command field is required!"), - )); - } - if let Some(args) = &body.args { - if let Some(args) = args { - s.args = Some(args.clone().to_owned()); - } - } - if let Some(dir) = &body.directory { - if let Some(dir) = dir { - s.directory = Some(dir.clone().to_owned()); - } - } - if let Some(auto) = &body.autostart { - s.autostart = auto.clone().to_owned(); - } else { - s.autostart = false; - } - let service: SalarymanService = - SalarymanService::from_parts(s.clone(), Arc::new(RwLock::new(Service::from_conf(&s)))); - if service.config.autostart { - let mut lock = service.service.write().await; - match lock.start_with_output().await { - Ok(_) => (), - Err(_) => (), - } - drop(lock); - } - let mut lock = ctx.config.write().await; - lock.service.push(s.clone()); - drop(lock); - let mut lock = ctx.services.write().await; - lock.push(Arc::new(service)); - drop(lock); - Ok(HttpResponseOk(ServiceConf::new())) -} -#[endpoint { - method = GET, - path = "/service/{service_uuid}", -}] -pub async fn endpoint_get_service( - rqctx: RequestContext<Arc<SalarymanDContext>>, - path_params: Path<ServicePath>, -) -> Result<HttpResponseOk<ServiceConf>, HttpError> { - let u = path_params.into_inner().service_uuid; - let ctx = rqctx.context(); - let mut service: Option<Arc<SalarymanService>> = None; - let lock = ctx.services.read().await; - for i in 0..lock.len() { - if lock[i].config.uuid == u { - service = Some(lock[i].clone()); - } else { - continue; - } - } - let s = match service { - Some(s) => s.config.clone(), - None => { - return Err(HttpError::for_unavail( - None, - String::from("Service Not Found"), - )); - } - }; - Ok(HttpResponseOk(s)) -} -#[endpoint { - method = GET, - path = "/service/{service_uuid}/start", -}] -pub async fn endpoint_start_service( - rqctx: RequestContext<Arc<SalarymanDContext>>, - path_params: Path<ServicePath>, -) -> Result<HttpResponseOk<()>, HttpError> { - let u = path_params.into_inner().service_uuid; - let ctx = rqctx.context(); - let mut service: Option<Arc<SalarymanService>> = None; - let lock = ctx.services.read().await; - for i in 0..lock.len() { - if lock[i].config.uuid == u { - service = Some(lock[i].clone()); - } else { - continue; - } - } - match service { - Some(s) => { - let mut lock = s.service.write().await; - match lock.start_with_output().await { - Ok(_) => (), - Err(e) => return Err(HttpError::for_internal_error(e.to_string())), - } - } - None => { - return Err(HttpError::for_unavail( - None, - String::from("Service Not Found"), - )); - } - }; - Ok(HttpResponseOk(())) -} -#[endpoint { - method = GET, - path = "/service/{service_uuid}/stop", -}] -pub async fn endpoint_stop_service( - rqctx: RequestContext<Arc<SalarymanDContext>>, - path_params: Path<ServicePath>, -) -> Result<HttpResponseOk<()>, HttpError> { - let u = path_params.into_inner().service_uuid; - let ctx = rqctx.context(); - let mut service: Option<Arc<SalarymanService>> = None; - let lock = ctx.services.read().await; - for i in 0..lock.len() { - if lock[i].config.uuid == u { - service = Some(lock[i].clone()); - } else { - continue; - } - } - match service { - Some(s) => { - let mut lock = s.service.write().await; - match lock.stop().await { - Ok(_) => (), - Err(e) => return Err(HttpError::for_internal_error(e.to_string())), - } - } - None => { - return Err(HttpError::for_unavail( - None, - String::from("Service Not Found"), - )); - } - }; - Ok(HttpResponseOk(())) -} -#[endpoint { - method = GET, - path = "/service/{service_uuid}/restart", -}] -pub async fn endpoint_restart_service( - rqctx: RequestContext<Arc<SalarymanDContext>>, - path_params: Path<ServicePath>, -) -> Result<HttpResponseOk<()>, HttpError> { - let u = path_params.into_inner().service_uuid; - let ctx = rqctx.context(); - let mut service: Option<Arc<SalarymanService>> = None; - let lock = ctx.services.read().await; - for i in 0..lock.len() { - if lock[i].config.uuid == u { - service = Some(lock[i].clone()); - } else { - continue; - } - } - match service { - Some(s) => { - let mut lock = s.service.write().await; - match lock.restart_with_output().await { - Ok(_) => (), - Err(e) => return Err(HttpError::for_internal_error(e.to_string())), - } - } - None => { - return Err(HttpError::for_unavail( - None, - String::from("Service Not Found"), - )); - } - }; - Ok(HttpResponseOk(())) -} -#[endpoint { - method = PUT, - path = "/service/{service_uuid}/write" -}] -pub async fn endpoint_post_stdin( - rqctx: RequestContext<Arc<SalarymanDContext>>, - path_params: Path<ServicePath>, - update: TypedBody<StdinBuffer>, -) -> Result<HttpResponseOk<()>, HttpError> { - let ctx = rqctx.context(); - let stdin_str = update.into_inner(); - let u = path_params.into_inner().service_uuid; - let mut service: Option<Arc<SalarymanService>> = None; - let lock = ctx.services.read().await; - for i in 0..lock.len() { - if lock[i].config.uuid == u { - service = Some(lock[i].clone()); - } else { - continue; - } - } - match service { - Some(s) => { - let mut lock = s.service.write().await; - if lock.started().await { - let b = if let Some(endl) = stdin_str.endl { - if endl { - lock.writeln_stdin(stdin_str.stdin.clone()).await //TODO: PROPERLY HANDLE ERROR! - } else { - lock.write_stdin(stdin_str.stdin.clone()).await //TODO: PROPERLY HANDLE ERROR! - } - } else { - lock.writeln_stdin(stdin_str.stdin.clone()).await //TODO: PROPERLY HANDLE ERROR! - }; - match b { - Ok(_) => (), - Err(e) => return Err(HttpError::for_internal_error(e.to_string())), - } - } - drop(lock); - } - None => { - return Err(HttpError::for_unavail( - None, - String::from("Service Not Found"), - )); - } - } - Ok(HttpResponseOk(())) -} diff --git a/src/server/main.rs b/src/server/main.rs index 7557145..b769fbf 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -1,25 +1,12 @@ -mod context; -mod endpoints; - use clap::Parser; -use dropshot::{ApiDescription, ConfigDropshot, ConfigLogging, ConfigLoggingLevel, ServerBuilder}; -use salaryman::service::{Service, ServiceConf}; -use schemars::JsonSchema; +use salaryman::service::{Service, ServiceConf, ServiceState}; use serde::{Deserialize, Serialize}; -use tokio::{fs::read_to_string, sync::RwLock}; - use std::{ - net::{IpAddr, SocketAddr}, + fs::read_to_string, + os::unix::net::{UnixListener, UnixStream}, path::PathBuf, - sync::Arc, -}; - -use crate::context::{SalarymanDContext, SalarymanService}; -use crate::endpoints::{ - endpoint_get_config, endpoint_get_config_save, endpoint_get_service, endpoint_get_services, - endpoint_post_service, endpoint_post_stdin, endpoint_put_config, endpoint_restart_service, - endpoint_start_service, endpoint_stop_service, }; +use rayon::prelude::*; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -35,47 +22,33 @@ struct Args { #[arg( short, long, - value_name = "ADDR", - help = "IP address to bind API to", - default_value = "127.0.0.1" + value_name = "SOCK", + help = "UNIX socket to bind", + default_value = "/tmp/salaryman.sock" )] - address: IpAddr, - #[arg( - short, - long, - value_name = "PORT", - help = "TCP Port to bind API to", - default_value = "3080" - )] - port: u16, + socket: PathBuf, } -#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)] -pub struct User { - pub username: String, - pub token: String, +pub enum ServiceReq { + Create(ServiceConf), } -#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct Config { - pub address: Option<IpAddr>, - pub port: Option<u16>, - pub user: Vec<User>, + pub socket: Option<PathBuf>, pub service: Vec<ServiceConf>, } impl Config { pub fn new() -> Self { Self { - address: None, - port: None, - user: Vec::new(), + socket: None, service: Vec::new(), } } } -async fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error>> { - let s: String = match read_to_string(file).await { +fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error>> { + let s: String = match read_to_string(file) { Ok(s) => s, Err(_) => { return Err(Box::new(std::io::Error::new( @@ -93,70 +66,28 @@ async fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error } } -#[tokio::main] -async fn main() -> Result<(), Box<dyn std::error::Error>> { +fn main() -> Result<(), Box<dyn std::error::Error>> { let args = Args::parse(); - let conf: Config = load_config(&args.config).await?; - let addr = if let Some(addr) = conf.address { - addr - } else { - args.address - }; - let port = if let Some(port) = conf.port { - port + let conf: Config = load_config(&args.config)?; + let _sockaddr = if let Some(sock) = conf.socket { + sock } else { - args.port + args.socket }; - let bind = SocketAddr::new(addr, port); - let services: RwLock<Vec<Arc<SalarymanService>>> = RwLock::new(Vec::new()); - for i in 0..conf.service.len() { - let mut lock = services.write().await; - lock.push(Arc::new(SalarymanService::from_parts( - conf.service[i].clone(), - Arc::new(RwLock::new(Service::from_conf(&conf.service[i]))), - ))); - drop(lock); + let mut services: Vec<Service> = Vec::new(); + for service in conf.service { + services.push(service.build()?); } - let lock = services.write().await; - for i in 0..lock.len() { - if lock[i].config.autostart { - let mut l = lock[i].service.write().await; - l.start().await?; - l.scan_stdout().await?; - l.scan_stderr().await?; - drop(l); - } + loop { + services.par_iter_mut() + .for_each(|service| { + match service.state().expect("unable to get service state") { + ServiceState::Failed => service.restart().expect("unable to restart service"), + ServiceState::Stopped => (), + _ => (), + } + }); + services.push(Service::new()); + std::thread::sleep(std::time::Duration::from_millis(100)); } - drop(lock); - let log_conf = ConfigLogging::StderrTerminal { - level: ConfigLoggingLevel::Info, - }; - let log = log_conf.to_logger("smd")?; - let ctx = Arc::new(SalarymanDContext::from_parts( - services, - args.config, - Arc::new(RwLock::new(conf)), - )); - let config = ConfigDropshot { - bind_address: bind, - ..Default::default() - }; - let mut api = ApiDescription::new(); - api.register(endpoint_get_services)?; - api.register(endpoint_get_service)?; - api.register(endpoint_start_service)?; - api.register(endpoint_stop_service)?; - api.register(endpoint_restart_service)?; - api.register(endpoint_post_stdin)?; - api.register(endpoint_post_service)?; - api.register(endpoint_get_config)?; - api.register(endpoint_put_config)?; - api.register(endpoint_get_config_save)?; - api.openapi("Salaryman", semver::Version::new(1, 0, 0)) - .write(&mut std::io::stdout())?; - let server = ServerBuilder::new(api, ctx.clone(), log) - .config(config) - .start()?; - server.await?; - Ok(()) } |