diff options
author | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-29 09:26:15 +0000 |
---|---|---|
committer | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-29 09:26:15 +0000 |
commit | 40a65f981a664a186a52fc2981b99a6d8a1191d4 (patch) | |
tree | cc448f8521d3caa1f1d6396ad67e9b09bf648b05 /src | |
parent | e33f9a59f875edf1240ca80c1014235296ff3cbf (diff) | |
download | salaryman-40a65f981a664a186a52fc2981b99a6d8a1191d4.tar.gz salaryman-40a65f981a664a186a52fc2981b99a6d8a1191d4.tar.bz2 salaryman-40a65f981a664a186a52fc2981b99a6d8a1191d4.zip |
parallel process monitoring get
git-svn-id: svn+ssh://diminuette.aengel.lesbianunix.dev/salaryman/trunk@16 b9215c17-b818-4693-b096-d1e41a411fef
Diffstat (limited to 'src')
-rw-r--r-- | src/cli/main.rs | 3 | ||||
-rw-r--r-- | src/lib.rs | 3 | ||||
-rw-r--r-- | src/model.rs | 31 | ||||
-rw-r--r-- | src/server/context.rs | 47 | ||||
-rw-r--r-- | src/server/endpoints.rs | 359 | ||||
-rw-r--r-- | src/server/main.rs | 137 | ||||
-rw-r--r-- | src/service.rs | 348 |
7 files changed, 205 insertions, 723 deletions
diff --git a/src/cli/main.rs b/src/cli/main.rs index 1b85472..078760e 100644 --- a/src/cli/main.rs +++ b/src/cli/main.rs @@ -1,4 +1,3 @@ -#[tokio::main] -async fn main() -> Result<(), Box<dyn std::error::Error>> { +fn main() -> Result<(), Box<dyn std::error::Error>> { Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 95486cd..1f278a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1 @@ pub mod service; - -#[cfg(feature = "models")] -pub mod model; diff --git a/src/model.rs b/src/model.rs deleted file mode 100644 index 578d9b4..0000000 --- a/src/model.rs +++ /dev/null @@ -1,31 +0,0 @@ -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; -use std::net::IpAddr; -use std::path::PathBuf; -use uuid::Uuid; - -#[derive(Serialize, Deserialize, JsonSchema, Debug)] -pub struct UpdateConf { - pub address: Option<IpAddr>, - pub port: Option<u16>, -} - -#[derive(Serialize, Deserialize, JsonSchema, Debug)] -pub struct StdinBuffer { - pub stdin: String, - pub endl: Option<bool>, -} - -#[derive(Serialize, Deserialize, JsonSchema, Debug)] -pub struct ServicePath { - pub service_uuid: Uuid, -} - -#[derive(Serialize, Deserialize, JsonSchema, Debug)] -pub struct NewService { - pub name: Option<String>, - pub command: Option<String>, - pub args: Option<Option<String>>, - pub directory: Option<Option<PathBuf>>, - pub autostart: Option<bool>, -} diff --git a/src/server/context.rs b/src/server/context.rs deleted file mode 100644 index 132e2dc..0000000 --- a/src/server/context.rs +++ /dev/null @@ -1,47 +0,0 @@ -use super::Config; -use salaryman::service::{Service, ServiceConf}; -use std::path::PathBuf; -use std::sync::Arc; -use tokio::sync::RwLock; - -pub struct SalarymanService { - pub config: ServiceConf, - pub service: Arc<RwLock<Service>>, -} -impl SalarymanService { - pub fn new() -> Self { - Self { - config: ServiceConf::new(), - service: Arc::new(RwLock::new(Service::new())), - } - } - pub fn from_parts(config: ServiceConf, service: Arc<RwLock<Service>>) -> Self { - Self { config, service } - } -} - -pub struct SalarymanDContext { - pub services: RwLock<Vec<Arc<SalarymanService>>>, - pub save_file: PathBuf, - pub config: Arc<RwLock<Config>>, -} -impl SalarymanDContext { - pub fn new() -> Self { - Self { - services: RwLock::new(Vec::new()), - save_file: PathBuf::from(""), - config: Arc::new(RwLock::new(Config::new())), - } - } - pub fn from_parts( - services: RwLock<Vec<Arc<SalarymanService>>>, - save_file: PathBuf, - config: Arc<RwLock<Config>>, - ) -> Self { - Self { - services, - save_file, - config, - } - } -} diff --git a/src/server/endpoints.rs b/src/server/endpoints.rs deleted file mode 100644 index 8ebc42e..0000000 --- a/src/server/endpoints.rs +++ /dev/null @@ -1,359 +0,0 @@ -use super::Config; -use super::context::{SalarymanDContext, SalarymanService}; -use dropshot::{HttpError, HttpResponseOk, Path, RequestContext, TypedBody, endpoint}; -use salaryman::model::{NewService, ServicePath, StdinBuffer, UpdateConf}; -use salaryman::service::{Service, ServiceConf}; -use std::sync::Arc; -use tokio::fs::File; -use tokio::io::AsyncWriteExt; -use tokio::sync::RwLock; - -#[endpoint { - method = GET, - path = "/config", -}] -pub async fn endpoint_get_config( - rqctx: RequestContext<Arc<SalarymanDContext>>, -) -> Result<HttpResponseOk<Config>, HttpError> { - let ctx = rqctx.context(); - let lock = ctx.config.read().await; - let conf = lock.clone(); - drop(lock); - Ok(HttpResponseOk(conf)) -} -#[endpoint { - method = PUT, - path = "/config", -}] -pub async fn endpoint_put_config( - rqctx: RequestContext<Arc<SalarymanDContext>>, - body: TypedBody<UpdateConf>, -) -> Result<HttpResponseOk<()>, HttpError> { - let ctx = rqctx.context(); - let update = body.into_inner(); - if let Some(addr) = update.address { - let mut lock = ctx.config.write().await; - lock.address = Some(addr); - drop(lock); - } - if let Some(port) = update.port { - let mut lock = ctx.config.write().await; - lock.port = Some(port); - drop(lock); - } - Ok(HttpResponseOk(())) -} -#[endpoint { - method = GET, - path = "/config/save" -}] -pub async fn endpoint_get_config_save( - rqctx: RequestContext<Arc<SalarymanDContext>>, -) -> Result<HttpResponseOk<()>, HttpError> { - let ctx = rqctx.context(); - let mut v: Vec<ServiceConf> = Vec::new(); - let rlock = ctx.services.read().await; - for i in 0..rlock.len() { - v.push(rlock[i].config.clone()); - } - drop(rlock); - let rlock = ctx.config.read().await; - let save = Config { - address: rlock.address, - port: rlock.port, - user: rlock.user.clone(), - service: v, - }; - drop(rlock); - let mut f = match File::open(&ctx.save_file).await { - Ok(f) => f, - Err(_) => { - return Err(HttpError::for_internal_error(String::from( - "cannot open desired file", - ))); - } - }; - let save = match toml::to_string(&save) { - Ok(s) => s, - Err(_) => { - return Err(HttpError::for_internal_error(String::from( - "cannot serialize config!", - ))); - } - }; - match f.write_all(save.as_str().as_bytes()).await { - Ok(_) => (), - Err(_) => { - return Err(HttpError::for_internal_error(String::from( - "could not write to file!", - ))); - } - } - Ok(HttpResponseOk(())) -} -#[endpoint { - method = GET, - path = "/service", -}] -pub async fn endpoint_get_services( - rqctx: RequestContext<Arc<SalarymanDContext>>, -) -> Result<HttpResponseOk<Vec<ServiceConf>>, HttpError> { - let ret = { - let ctx = rqctx.context(); - let mut v: Vec<ServiceConf> = Vec::new(); - let lock = ctx.services.read().await; - for i in 0..lock.len() { - v.push(lock[i].config.clone()); - } - v - }; - Ok(HttpResponseOk(ret)) -} -#[endpoint { - method = POST, - path = "/service", -}] -pub async fn endpoint_post_service( - rqctx: RequestContext<Arc<SalarymanDContext>>, - body: TypedBody<NewService>, -) -> Result<HttpResponseOk<ServiceConf>, HttpError> { - let ctx = rqctx.context(); - let body = body.into_inner(); - let mut s: ServiceConf = ServiceConf::new(); - if let Some(name) = &body.name { - s.name = name.clone().to_owned(); - } else { - return Err(HttpError::for_bad_request( - None, - String::from("name field is required!"), - )); - } - if let Some(command) = &body.command { - s.command = command.clone().to_owned(); - } else { - return Err(HttpError::for_bad_request( - None, - String::from("command field is required!"), - )); - } - if let Some(args) = &body.args { - if let Some(args) = args { - s.args = Some(args.clone().to_owned()); - } - } - if let Some(dir) = &body.directory { - if let Some(dir) = dir { - s.directory = Some(dir.clone().to_owned()); - } - } - if let Some(auto) = &body.autostart { - s.autostart = auto.clone().to_owned(); - } else { - s.autostart = false; - } - let service: SalarymanService = - SalarymanService::from_parts(s.clone(), Arc::new(RwLock::new(Service::from_conf(&s)))); - if service.config.autostart { - let mut lock = service.service.write().await; - match lock.start_with_output().await { - Ok(_) => (), - Err(_) => (), - } - drop(lock); - } - let mut lock = ctx.config.write().await; - lock.service.push(s.clone()); - drop(lock); - let mut lock = ctx.services.write().await; - lock.push(Arc::new(service)); - drop(lock); - Ok(HttpResponseOk(ServiceConf::new())) -} -#[endpoint { - method = GET, - path = "/service/{service_uuid}", -}] -pub async fn endpoint_get_service( - rqctx: RequestContext<Arc<SalarymanDContext>>, - path_params: Path<ServicePath>, -) -> Result<HttpResponseOk<ServiceConf>, HttpError> { - let u = path_params.into_inner().service_uuid; - let ctx = rqctx.context(); - let mut service: Option<Arc<SalarymanService>> = None; - let lock = ctx.services.read().await; - for i in 0..lock.len() { - if lock[i].config.uuid == u { - service = Some(lock[i].clone()); - } else { - continue; - } - } - let s = match service { - Some(s) => s.config.clone(), - None => { - return Err(HttpError::for_unavail( - None, - String::from("Service Not Found"), - )); - } - }; - Ok(HttpResponseOk(s)) -} -#[endpoint { - method = GET, - path = "/service/{service_uuid}/start", -}] -pub async fn endpoint_start_service( - rqctx: RequestContext<Arc<SalarymanDContext>>, - path_params: Path<ServicePath>, -) -> Result<HttpResponseOk<()>, HttpError> { - let u = path_params.into_inner().service_uuid; - let ctx = rqctx.context(); - let mut service: Option<Arc<SalarymanService>> = None; - let lock = ctx.services.read().await; - for i in 0..lock.len() { - if lock[i].config.uuid == u { - service = Some(lock[i].clone()); - } else { - continue; - } - } - match service { - Some(s) => { - let mut lock = s.service.write().await; - match lock.start_with_output().await { - Ok(_) => (), - Err(e) => return Err(HttpError::for_internal_error(e.to_string())), - } - } - None => { - return Err(HttpError::for_unavail( - None, - String::from("Service Not Found"), - )); - } - }; - Ok(HttpResponseOk(())) -} -#[endpoint { - method = GET, - path = "/service/{service_uuid}/stop", -}] -pub async fn endpoint_stop_service( - rqctx: RequestContext<Arc<SalarymanDContext>>, - path_params: Path<ServicePath>, -) -> Result<HttpResponseOk<()>, HttpError> { - let u = path_params.into_inner().service_uuid; - let ctx = rqctx.context(); - let mut service: Option<Arc<SalarymanService>> = None; - let lock = ctx.services.read().await; - for i in 0..lock.len() { - if lock[i].config.uuid == u { - service = Some(lock[i].clone()); - } else { - continue; - } - } - match service { - Some(s) => { - let mut lock = s.service.write().await; - match lock.stop().await { - Ok(_) => (), - Err(e) => return Err(HttpError::for_internal_error(e.to_string())), - } - } - None => { - return Err(HttpError::for_unavail( - None, - String::from("Service Not Found"), - )); - } - }; - Ok(HttpResponseOk(())) -} -#[endpoint { - method = GET, - path = "/service/{service_uuid}/restart", -}] -pub async fn endpoint_restart_service( - rqctx: RequestContext<Arc<SalarymanDContext>>, - path_params: Path<ServicePath>, -) -> Result<HttpResponseOk<()>, HttpError> { - let u = path_params.into_inner().service_uuid; - let ctx = rqctx.context(); - let mut service: Option<Arc<SalarymanService>> = None; - let lock = ctx.services.read().await; - for i in 0..lock.len() { - if lock[i].config.uuid == u { - service = Some(lock[i].clone()); - } else { - continue; - } - } - match service { - Some(s) => { - let mut lock = s.service.write().await; - match lock.restart_with_output().await { - Ok(_) => (), - Err(e) => return Err(HttpError::for_internal_error(e.to_string())), - } - } - None => { - return Err(HttpError::for_unavail( - None, - String::from("Service Not Found"), - )); - } - }; - Ok(HttpResponseOk(())) -} -#[endpoint { - method = PUT, - path = "/service/{service_uuid}/write" -}] -pub async fn endpoint_post_stdin( - rqctx: RequestContext<Arc<SalarymanDContext>>, - path_params: Path<ServicePath>, - update: TypedBody<StdinBuffer>, -) -> Result<HttpResponseOk<()>, HttpError> { - let ctx = rqctx.context(); - let stdin_str = update.into_inner(); - let u = path_params.into_inner().service_uuid; - let mut service: Option<Arc<SalarymanService>> = None; - let lock = ctx.services.read().await; - for i in 0..lock.len() { - if lock[i].config.uuid == u { - service = Some(lock[i].clone()); - } else { - continue; - } - } - match service { - Some(s) => { - let mut lock = s.service.write().await; - if lock.started().await { - let b = if let Some(endl) = stdin_str.endl { - if endl { - lock.writeln_stdin(stdin_str.stdin.clone()).await //TODO: PROPERLY HANDLE ERROR! - } else { - lock.write_stdin(stdin_str.stdin.clone()).await //TODO: PROPERLY HANDLE ERROR! - } - } else { - lock.writeln_stdin(stdin_str.stdin.clone()).await //TODO: PROPERLY HANDLE ERROR! - }; - match b { - Ok(_) => (), - Err(e) => return Err(HttpError::for_internal_error(e.to_string())), - } - } - drop(lock); - } - None => { - return Err(HttpError::for_unavail( - None, - String::from("Service Not Found"), - )); - } - } - Ok(HttpResponseOk(())) -} diff --git a/src/server/main.rs b/src/server/main.rs index 7557145..b769fbf 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -1,25 +1,12 @@ -mod context; -mod endpoints; - use clap::Parser; -use dropshot::{ApiDescription, ConfigDropshot, ConfigLogging, ConfigLoggingLevel, ServerBuilder}; -use salaryman::service::{Service, ServiceConf}; -use schemars::JsonSchema; +use salaryman::service::{Service, ServiceConf, ServiceState}; use serde::{Deserialize, Serialize}; -use tokio::{fs::read_to_string, sync::RwLock}; - use std::{ - net::{IpAddr, SocketAddr}, + fs::read_to_string, + os::unix::net::{UnixListener, UnixStream}, path::PathBuf, - sync::Arc, -}; - -use crate::context::{SalarymanDContext, SalarymanService}; -use crate::endpoints::{ - endpoint_get_config, endpoint_get_config_save, endpoint_get_service, endpoint_get_services, - endpoint_post_service, endpoint_post_stdin, endpoint_put_config, endpoint_restart_service, - endpoint_start_service, endpoint_stop_service, }; +use rayon::prelude::*; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -35,47 +22,33 @@ struct Args { #[arg( short, long, - value_name = "ADDR", - help = "IP address to bind API to", - default_value = "127.0.0.1" + value_name = "SOCK", + help = "UNIX socket to bind", + default_value = "/tmp/salaryman.sock" )] - address: IpAddr, - #[arg( - short, - long, - value_name = "PORT", - help = "TCP Port to bind API to", - default_value = "3080" - )] - port: u16, + socket: PathBuf, } -#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)] -pub struct User { - pub username: String, - pub token: String, +pub enum ServiceReq { + Create(ServiceConf), } -#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct Config { - pub address: Option<IpAddr>, - pub port: Option<u16>, - pub user: Vec<User>, + pub socket: Option<PathBuf>, pub service: Vec<ServiceConf>, } impl Config { pub fn new() -> Self { Self { - address: None, - port: None, - user: Vec::new(), + socket: None, service: Vec::new(), } } } -async fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error>> { - let s: String = match read_to_string(file).await { +fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error>> { + let s: String = match read_to_string(file) { Ok(s) => s, Err(_) => { return Err(Box::new(std::io::Error::new( @@ -93,70 +66,28 @@ async fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error } } -#[tokio::main] -async fn main() -> Result<(), Box<dyn std::error::Error>> { +fn main() -> Result<(), Box<dyn std::error::Error>> { let args = Args::parse(); - let conf: Config = load_config(&args.config).await?; - let addr = if let Some(addr) = conf.address { - addr - } else { - args.address - }; - let port = if let Some(port) = conf.port { - port + let conf: Config = load_config(&args.config)?; + let _sockaddr = if let Some(sock) = conf.socket { + sock } else { - args.port + args.socket }; - let bind = SocketAddr::new(addr, port); - let services: RwLock<Vec<Arc<SalarymanService>>> = RwLock::new(Vec::new()); - for i in 0..conf.service.len() { - let mut lock = services.write().await; - lock.push(Arc::new(SalarymanService::from_parts( - conf.service[i].clone(), - Arc::new(RwLock::new(Service::from_conf(&conf.service[i]))), - ))); - drop(lock); + let mut services: Vec<Service> = Vec::new(); + for service in conf.service { + services.push(service.build()?); } - let lock = services.write().await; - for i in 0..lock.len() { - if lock[i].config.autostart { - let mut l = lock[i].service.write().await; - l.start().await?; - l.scan_stdout().await?; - l.scan_stderr().await?; - drop(l); - } + loop { + services.par_iter_mut() + .for_each(|service| { + match service.state().expect("unable to get service state") { + ServiceState::Failed => service.restart().expect("unable to restart service"), + ServiceState::Stopped => (), + _ => (), + } + }); + services.push(Service::new()); + std::thread::sleep(std::time::Duration::from_millis(100)); } - drop(lock); - let log_conf = ConfigLogging::StderrTerminal { - level: ConfigLoggingLevel::Info, - }; - let log = log_conf.to_logger("smd")?; - let ctx = Arc::new(SalarymanDContext::from_parts( - services, - args.config, - Arc::new(RwLock::new(conf)), - )); - let config = ConfigDropshot { - bind_address: bind, - ..Default::default() - }; - let mut api = ApiDescription::new(); - api.register(endpoint_get_services)?; - api.register(endpoint_get_service)?; - api.register(endpoint_start_service)?; - api.register(endpoint_stop_service)?; - api.register(endpoint_restart_service)?; - api.register(endpoint_post_stdin)?; - api.register(endpoint_post_service)?; - api.register(endpoint_get_config)?; - api.register(endpoint_put_config)?; - api.register(endpoint_get_config_save)?; - api.openapi("Salaryman", semver::Version::new(1, 0, 0)) - .write(&mut std::io::stdout())?; - let server = ServerBuilder::new(api, ctx.clone(), log) - .config(config) - .start()?; - server.await?; - Ok(()) } diff --git a/src/service.rs b/src/service.rs index 22cf9c4..28e4401 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,26 +1,26 @@ -use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use tokio::{ - io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, - process::{Child, Command}, - sync::{ - Mutex, - mpsc::{Receiver, channel}, - }, - task::spawn, +use std::{ + fs::File, + io::{BufRead, BufReader, Write}, + path::PathBuf, + process::{Child, Command, Stdio}, }; use uuid::Uuid; -use std::{path::PathBuf, process::Stdio, sync::Arc}; +pub enum ServiceState { + Running, + Failed, + Stopped, +} -#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct ServiceConf { - pub uuid: Uuid, - pub name: String, - pub command: String, - pub args: Option<String>, - pub directory: Option<PathBuf>, - pub autostart: bool, + uuid: Uuid, + name: String, + command: String, + args: Option<String>, + directory: Option<PathBuf>, + autostart: bool, } impl Default for ServiceConf { fn default() -> Self { @@ -28,9 +28,7 @@ impl Default for ServiceConf { } } impl ServiceConf { - /** - * Returns a new empty `ServiceConf` - */ + /// Returns a new empty `ServiceConf` pub fn new() -> Self { Self { uuid: Uuid::new_v4(), @@ -41,9 +39,7 @@ impl ServiceConf { autostart: false, } } - /** - * Returns a new `ServiceConf` from parts. - */ + /// Returns a new `ServiceConf` from parts. pub fn from_parts( uuid: Uuid, name: String, @@ -61,9 +57,7 @@ impl ServiceConf { autostart, } } - /** - * Returns a new `ServiceConf` from parts with new uuid. - */ + /// Returns a new `ServiceConf` from parts with new uuid. pub fn new_from_parts( name: String, command: String, @@ -80,64 +74,147 @@ impl ServiceConf { autostart, } } + /// Returns the `uuid::Uuid` associated with the service config + pub fn get_uuid(&self) -> &Uuid { + &self.uuid + } + /// Returns the name of the described service + pub fn get_name(&self) -> &str { + &self.name + } + /// Returns the command of the described service + pub fn get_command(&self) -> &str { + &self.command + } + /// Returns the args of the described service + pub fn get_args(&self) -> &Option<String> { + &self.args + } + /// Returns the work directory of the described service + pub fn get_work_dir(&self) -> &Option<PathBuf> { + &self.directory + } + /// Returns the autostart status of the described service + pub fn get_autostart(&self) -> bool { + self.autostart + } + /// Sets the name of the described service + pub fn name(&mut self, name: &str) -> &mut Self { + self.name = String::from(name); + self + } + /// Sets the command of the described service + pub fn command(&mut self, command: &str) -> &mut Self { + self.command = String::from(command); + self + } + /// Sets the args of the described service + pub fn args(&mut self, args: &Option<String>) -> &mut Self { + self.args = args.clone(); + self + } + /// Sets the work directory of the described service + pub fn work_dir(&mut self, work_dir: &Option<PathBuf>) -> &mut Self { + self.directory = work_dir.clone(); + self + } + /// Sets the autostart value of the described service + pub fn autostart(&mut self, autostart: bool) -> &mut Self { + self.autostart = autostart; + self + } + /// Builds a Service from this object + #[inline] + pub fn build(&self) -> Result<Service, Box<dyn std::error::Error>> { + Service::from_conf(&self) + } } #[derive(Debug)] pub struct Service { conf: ServiceConf, - proc: Option<Arc<Mutex<Child>>>, - pub stdout: Option<Arc<Mutex<Receiver<String>>>>, - pub stderr: Option<Arc<Mutex<Receiver<String>>>>, + proc: Option<Child>, + pub outpath: Option<PathBuf>, + pub errpath: Option<PathBuf>, } impl Default for Service { fn default() -> Self { Self::new() } } -impl Service { - /** - * Returns a new empty `Service` - */ +impl<'a> Service { + /// Returns a new empty `Service` pub fn new() -> Self { Self { conf: ServiceConf::default(), proc: None, - stdout: None, - stderr: None, + outpath: None, + errpath: None, } } - /** - * Returns a `Service` made from a `ServiceConf`. - */ - pub fn from_conf(conf: &ServiceConf) -> Self { - Self { - conf: conf.clone(), + /// Returns a `Service` made from a `ServiceConf`. + pub fn from_conf(conf: &ServiceConf) -> Result<Self, Box<dyn std::error::Error>> { + let mut service = Self { + conf: conf.to_owned(), proc: None, - stdout: None, - stderr: None, + outpath: None, + errpath: None, + }; + if conf.get_autostart() { + service.start()?; } + Ok(service) + } + /// Gets the ServiceConf associated with the service + #[inline] + pub fn config(&self) -> &ServiceConf { + &self.conf } - /** - * Returns the name of the service - */ - pub async fn name(&self) -> &str { - &self.conf.name + /// Returns the name of the service + #[inline] + pub fn name(&self) -> &str { + &self.config().get_name() + } + #[inline] + fn create_dirs(&self) -> Result<(), Box<dyn std::error::Error>> { + match std::fs::create_dir("./logs") { + Ok(_) => (), + Err(ref e) if e.kind() == std::io::ErrorKind::AlreadyExists => (), + Err(e) => return Err(Box::new(e)), + } + match std::fs::create_dir(format!("./logs/{}", &self.config().get_uuid())) { + Ok(_) => (), + Err(ref e) if e.kind() == std::io::ErrorKind::AlreadyExists => (), + Err(e) => return Err(Box::new(e)), + } + Ok(()) } - /** - * Uses `tokio::process::Command` to start the service. - */ - pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> { + /// Uses `tokio::process::Command` to start the service. + pub fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> { if self.proc.is_some() { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::AlreadyExists, "Process Already Exists", ))); } + self.create_dirs()?; + let outpath = PathBuf::from(format!( + "./logs/{}/{}.log", + &self.config().get_uuid(), + &self.name() + )); + let errpath = PathBuf::from(format!( + "./logs/{}/{}.err", + &self.config().get_uuid(), + &self.name() + )); + let outfile = File::options().append(true).create(true).open(&outpath)?; + let errfile = File::options().append(true).create(true).open(&errpath)?; let cmd = &self.conf.command; let mut proc = Command::new(cmd); proc.stdin(Stdio::piped()); - proc.stdout(Stdio::piped()); - proc.stderr(Stdio::piped()); + proc.stdout(outfile); + proc.stderr(errfile); if let Some(a) = &self.conf.args { proc.args(a.split_whitespace()); }; @@ -145,129 +222,42 @@ impl Service { proc.current_dir(c); }; let child = proc.spawn()?; - self.proc = Some(Arc::new(Mutex::new(child))); + self.proc = Some(child); + self.outpath = Some(outpath); + self.errpath = Some(errpath); Ok(()) } - /** - * Calls self.start(), then self.scan_stdout(), and finally self.scan_stderr() - */ - #[inline] - pub async fn start_with_output(&mut self) -> Result<(), Box<dyn std::error::Error>> { - self.start().await?; - self.scan_stdout().await?; - self.scan_stderr().await?; - Ok(()) - } - //TODO: process monitoring! - /** - * Returns true when process is started and false when process is stopped. - */ - pub async fn started(&self) -> bool { + /// Returns true when process is started and false when process is stopped. + pub fn started(&self) -> bool { self.proc.is_some() } - /** - * Invokes kill on the service process - */ - pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> { - if let Some(proc) = self.proc.clone() { - let mut lock = proc.lock().await; - lock.kill().await?; - drop(lock); - self.proc = None; - self.stdout = None; - self.stderr = None; - Ok(()) + /// Returns the process id + pub fn id(&self) -> Result<u32, Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.as_ref() { + Ok(proc.id()) } else { Err(Box::new(std::io::Error::new( std::io::ErrorKind::NotFound, - "No Process Associated with Service", + "process not started", ))) } } - /** - * Restarts service process - */ - #[inline] - pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> { - self.stop().await?; - self.start().await?; - Ok(()) - } - /** - * Restarts service process - */ - #[inline] - pub async fn restart_with_output(&mut self) -> Result<(), Box<dyn std::error::Error>> { - self.stop().await?; - self.start_with_output().await?; - Ok(()) - } - /** - * Takes control of service process' stdout file handle and spawns a new task to continuously - * scan it. - */ - pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> { - if let Some(proc) = self.proc.clone() { - let mut lock = proc.lock().await; - let stdout = if let Some(stdout) = lock.stdout.take() { - stdout - } else { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::NotFound, - "No stdout handle associated with process", - ))); - }; - drop(lock); - let (tx, rx) = channel(1024); - let sname = self.conf.name.clone(); - let suuid = self.conf.uuid.clone(); - spawn(async move { - let mut br = BufReader::new(stdout).lines(); - while let Ok(Some(line)) = br.next_line().await { - println!("{} ({}) :: {}", &suuid, &sname, &line); - if let Err(_) = tx.send(line).await { - return; - }; - } - }); - self.stdout = Some(Arc::new(Mutex::new(rx))); - Ok(()) + /// Returns the state of the service + pub fn state(&mut self) -> Result<ServiceState, Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.as_mut() { + match proc.try_wait() { + Err(_) | Ok(Some(_)) => Ok(ServiceState::Failed), + Ok(None) => Ok(ServiceState::Running), + } } else { - Err(Box::new(std::io::Error::new( - std::io::ErrorKind::NotFound, - "No Process Associated with Service", - ))) + Ok(ServiceState::Stopped) } } - /** - * Takes control of service process' stderr file handle and spawns a new task to continuously - * scan it. - */ - pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> { - if let Some(proc) = self.proc.clone() { - let mut lock = proc.lock().await; - let stderr = if let Some(stderr) = lock.stderr.take() { - stderr - } else { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::NotFound, - "No stderr handle associated with process", - ))); - }; - drop(lock); - let (tx, rx) = channel(1024); - let sname = self.conf.name.clone(); - let suuid = self.conf.uuid.clone(); - spawn(async move { - let mut br = BufReader::new(stderr).lines(); - while let Ok(Some(line)) = br.next_line().await { - eprintln!("{} ({}) >< {}", &suuid, &sname, &line); - if let Err(_) = tx.send(line).await { - return; - }; - } - }); - self.stderr = Some(Arc::new(Mutex::new(rx))); + /// Invokes kill on the service process + pub fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.as_mut() { + proc.kill()?; + self.proc = None; Ok(()) } else { Err(Box::new(std::io::Error::new( @@ -276,13 +266,17 @@ impl Service { ))) } } - /** - * Writes to the service process' stdin, if it exists. - */ - pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> { - if let Some(proc) = self.proc.clone() { - let mut lock = proc.lock().await; - let stdin = if let Some(stdin) = lock.stdin.as_mut() { + /// Restarts service process + #[inline] + pub fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> { + self.stop()?; + self.start()?; + Ok(()) + } + /// Writes to the service process' stdin, if it exists. + pub fn write_stdin(&mut self, buf: &str) -> Result<(), Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.as_mut() { + let stdin = if let Some(stdin) = proc.stdin.as_mut() { stdin } else { return Err(Box::new(std::io::Error::new( @@ -290,8 +284,8 @@ impl Service { "No stdin handle associated with process", ))); }; - stdin.write(&buf.as_bytes()).await?; - stdin.flush().await?; + stdin.write(&buf.as_bytes())?; + stdin.flush()?; Ok(()) } else { Err(Box::new(std::io::Error::new( @@ -300,11 +294,9 @@ impl Service { ))) } } - /** - * Writes a line to the service process' stdin, if it exists. - */ + /// Writes a line to the service process' stdin, if it exists. #[inline] - pub async fn writeln_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> { - self.write_stdin(format!("{}\n", buf)).await + pub fn writeln_stdin(&mut self, buf: &str) -> Result<(), Box<dyn std::error::Error>> { + self.write_stdin(&format!("{}\n", buf)) } } |