diff options
author | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-29 09:26:15 +0000 |
---|---|---|
committer | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-29 09:26:15 +0000 |
commit | 40a65f981a664a186a52fc2981b99a6d8a1191d4 (patch) | |
tree | cc448f8521d3caa1f1d6396ad67e9b09bf648b05 /src/service.rs | |
parent | e33f9a59f875edf1240ca80c1014235296ff3cbf (diff) | |
download | salaryman-40a65f981a664a186a52fc2981b99a6d8a1191d4.tar.gz salaryman-40a65f981a664a186a52fc2981b99a6d8a1191d4.tar.bz2 salaryman-40a65f981a664a186a52fc2981b99a6d8a1191d4.zip |
parallel process monitoring get
git-svn-id: svn+ssh://diminuette.aengel.lesbianunix.dev/salaryman/trunk@16 b9215c17-b818-4693-b096-d1e41a411fef
Diffstat (limited to 'src/service.rs')
-rw-r--r-- | src/service.rs | 348 |
1 files changed, 170 insertions, 178 deletions
diff --git a/src/service.rs b/src/service.rs index 22cf9c4..28e4401 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,26 +1,26 @@ -use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use tokio::{ - io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, - process::{Child, Command}, - sync::{ - Mutex, - mpsc::{Receiver, channel}, - }, - task::spawn, +use std::{ + fs::File, + io::{BufRead, BufReader, Write}, + path::PathBuf, + process::{Child, Command, Stdio}, }; use uuid::Uuid; -use std::{path::PathBuf, process::Stdio, sync::Arc}; +pub enum ServiceState { + Running, + Failed, + Stopped, +} -#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct ServiceConf { - pub uuid: Uuid, - pub name: String, - pub command: String, - pub args: Option<String>, - pub directory: Option<PathBuf>, - pub autostart: bool, + uuid: Uuid, + name: String, + command: String, + args: Option<String>, + directory: Option<PathBuf>, + autostart: bool, } impl Default for ServiceConf { fn default() -> Self { @@ -28,9 +28,7 @@ impl Default for ServiceConf { } } impl ServiceConf { - /** - * Returns a new empty `ServiceConf` - */ + /// Returns a new empty `ServiceConf` pub fn new() -> Self { Self { uuid: Uuid::new_v4(), @@ -41,9 +39,7 @@ impl ServiceConf { autostart: false, } } - /** - * Returns a new `ServiceConf` from parts. - */ + /// Returns a new `ServiceConf` from parts. pub fn from_parts( uuid: Uuid, name: String, @@ -61,9 +57,7 @@ impl ServiceConf { autostart, } } - /** - * Returns a new `ServiceConf` from parts with new uuid. - */ + /// Returns a new `ServiceConf` from parts with new uuid. pub fn new_from_parts( name: String, command: String, @@ -80,64 +74,147 @@ impl ServiceConf { autostart, } } + /// Returns the `uuid::Uuid` associated with the service config + pub fn get_uuid(&self) -> &Uuid { + &self.uuid + } + /// Returns the name of the described service + pub fn get_name(&self) -> &str { + &self.name + } + /// Returns the command of the described service + pub fn get_command(&self) -> &str { + &self.command + } + /// Returns the args of the described service + pub fn get_args(&self) -> &Option<String> { + &self.args + } + /// Returns the work directory of the described service + pub fn get_work_dir(&self) -> &Option<PathBuf> { + &self.directory + } + /// Returns the autostart status of the described service + pub fn get_autostart(&self) -> bool { + self.autostart + } + /// Sets the name of the described service + pub fn name(&mut self, name: &str) -> &mut Self { + self.name = String::from(name); + self + } + /// Sets the command of the described service + pub fn command(&mut self, command: &str) -> &mut Self { + self.command = String::from(command); + self + } + /// Sets the args of the described service + pub fn args(&mut self, args: &Option<String>) -> &mut Self { + self.args = args.clone(); + self + } + /// Sets the work directory of the described service + pub fn work_dir(&mut self, work_dir: &Option<PathBuf>) -> &mut Self { + self.directory = work_dir.clone(); + self + } + /// Sets the autostart value of the described service + pub fn autostart(&mut self, autostart: bool) -> &mut Self { + self.autostart = autostart; + self + } + /// Builds a Service from this object + #[inline] + pub fn build(&self) -> Result<Service, Box<dyn std::error::Error>> { + Service::from_conf(&self) + } } #[derive(Debug)] pub struct Service { conf: ServiceConf, - proc: Option<Arc<Mutex<Child>>>, - pub stdout: Option<Arc<Mutex<Receiver<String>>>>, - pub stderr: Option<Arc<Mutex<Receiver<String>>>>, + proc: Option<Child>, + pub outpath: Option<PathBuf>, + pub errpath: Option<PathBuf>, } impl Default for Service { fn default() -> Self { Self::new() } } -impl Service { - /** - * Returns a new empty `Service` - */ +impl<'a> Service { + /// Returns a new empty `Service` pub fn new() -> Self { Self { conf: ServiceConf::default(), proc: None, - stdout: None, - stderr: None, + outpath: None, + errpath: None, } } - /** - * Returns a `Service` made from a `ServiceConf`. - */ - pub fn from_conf(conf: &ServiceConf) -> Self { - Self { - conf: conf.clone(), + /// Returns a `Service` made from a `ServiceConf`. + pub fn from_conf(conf: &ServiceConf) -> Result<Self, Box<dyn std::error::Error>> { + let mut service = Self { + conf: conf.to_owned(), proc: None, - stdout: None, - stderr: None, + outpath: None, + errpath: None, + }; + if conf.get_autostart() { + service.start()?; } + Ok(service) + } + /// Gets the ServiceConf associated with the service + #[inline] + pub fn config(&self) -> &ServiceConf { + &self.conf } - /** - * Returns the name of the service - */ - pub async fn name(&self) -> &str { - &self.conf.name + /// Returns the name of the service + #[inline] + pub fn name(&self) -> &str { + &self.config().get_name() + } + #[inline] + fn create_dirs(&self) -> Result<(), Box<dyn std::error::Error>> { + match std::fs::create_dir("./logs") { + Ok(_) => (), + Err(ref e) if e.kind() == std::io::ErrorKind::AlreadyExists => (), + Err(e) => return Err(Box::new(e)), + } + match std::fs::create_dir(format!("./logs/{}", &self.config().get_uuid())) { + Ok(_) => (), + Err(ref e) if e.kind() == std::io::ErrorKind::AlreadyExists => (), + Err(e) => return Err(Box::new(e)), + } + Ok(()) } - /** - * Uses `tokio::process::Command` to start the service. - */ - pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> { + /// Uses `tokio::process::Command` to start the service. + pub fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> { if self.proc.is_some() { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::AlreadyExists, "Process Already Exists", ))); } + self.create_dirs()?; + let outpath = PathBuf::from(format!( + "./logs/{}/{}.log", + &self.config().get_uuid(), + &self.name() + )); + let errpath = PathBuf::from(format!( + "./logs/{}/{}.err", + &self.config().get_uuid(), + &self.name() + )); + let outfile = File::options().append(true).create(true).open(&outpath)?; + let errfile = File::options().append(true).create(true).open(&errpath)?; let cmd = &self.conf.command; let mut proc = Command::new(cmd); proc.stdin(Stdio::piped()); - proc.stdout(Stdio::piped()); - proc.stderr(Stdio::piped()); + proc.stdout(outfile); + proc.stderr(errfile); if let Some(a) = &self.conf.args { proc.args(a.split_whitespace()); }; @@ -145,129 +222,42 @@ impl Service { proc.current_dir(c); }; let child = proc.spawn()?; - self.proc = Some(Arc::new(Mutex::new(child))); + self.proc = Some(child); + self.outpath = Some(outpath); + self.errpath = Some(errpath); Ok(()) } - /** - * Calls self.start(), then self.scan_stdout(), and finally self.scan_stderr() - */ - #[inline] - pub async fn start_with_output(&mut self) -> Result<(), Box<dyn std::error::Error>> { - self.start().await?; - self.scan_stdout().await?; - self.scan_stderr().await?; - Ok(()) - } - //TODO: process monitoring! - /** - * Returns true when process is started and false when process is stopped. - */ - pub async fn started(&self) -> bool { + /// Returns true when process is started and false when process is stopped. + pub fn started(&self) -> bool { self.proc.is_some() } - /** - * Invokes kill on the service process - */ - pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> { - if let Some(proc) = self.proc.clone() { - let mut lock = proc.lock().await; - lock.kill().await?; - drop(lock); - self.proc = None; - self.stdout = None; - self.stderr = None; - Ok(()) + /// Returns the process id + pub fn id(&self) -> Result<u32, Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.as_ref() { + Ok(proc.id()) } else { Err(Box::new(std::io::Error::new( std::io::ErrorKind::NotFound, - "No Process Associated with Service", + "process not started", ))) } } - /** - * Restarts service process - */ - #[inline] - pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> { - self.stop().await?; - self.start().await?; - Ok(()) - } - /** - * Restarts service process - */ - #[inline] - pub async fn restart_with_output(&mut self) -> Result<(), Box<dyn std::error::Error>> { - self.stop().await?; - self.start_with_output().await?; - Ok(()) - } - /** - * Takes control of service process' stdout file handle and spawns a new task to continuously - * scan it. - */ - pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> { - if let Some(proc) = self.proc.clone() { - let mut lock = proc.lock().await; - let stdout = if let Some(stdout) = lock.stdout.take() { - stdout - } else { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::NotFound, - "No stdout handle associated with process", - ))); - }; - drop(lock); - let (tx, rx) = channel(1024); - let sname = self.conf.name.clone(); - let suuid = self.conf.uuid.clone(); - spawn(async move { - let mut br = BufReader::new(stdout).lines(); - while let Ok(Some(line)) = br.next_line().await { - println!("{} ({}) :: {}", &suuid, &sname, &line); - if let Err(_) = tx.send(line).await { - return; - }; - } - }); - self.stdout = Some(Arc::new(Mutex::new(rx))); - Ok(()) + /// Returns the state of the service + pub fn state(&mut self) -> Result<ServiceState, Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.as_mut() { + match proc.try_wait() { + Err(_) | Ok(Some(_)) => Ok(ServiceState::Failed), + Ok(None) => Ok(ServiceState::Running), + } } else { - Err(Box::new(std::io::Error::new( - std::io::ErrorKind::NotFound, - "No Process Associated with Service", - ))) + Ok(ServiceState::Stopped) } } - /** - * Takes control of service process' stderr file handle and spawns a new task to continuously - * scan it. - */ - pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> { - if let Some(proc) = self.proc.clone() { - let mut lock = proc.lock().await; - let stderr = if let Some(stderr) = lock.stderr.take() { - stderr - } else { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::NotFound, - "No stderr handle associated with process", - ))); - }; - drop(lock); - let (tx, rx) = channel(1024); - let sname = self.conf.name.clone(); - let suuid = self.conf.uuid.clone(); - spawn(async move { - let mut br = BufReader::new(stderr).lines(); - while let Ok(Some(line)) = br.next_line().await { - eprintln!("{} ({}) >< {}", &suuid, &sname, &line); - if let Err(_) = tx.send(line).await { - return; - }; - } - }); - self.stderr = Some(Arc::new(Mutex::new(rx))); + /// Invokes kill on the service process + pub fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.as_mut() { + proc.kill()?; + self.proc = None; Ok(()) } else { Err(Box::new(std::io::Error::new( @@ -276,13 +266,17 @@ impl Service { ))) } } - /** - * Writes to the service process' stdin, if it exists. - */ - pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> { - if let Some(proc) = self.proc.clone() { - let mut lock = proc.lock().await; - let stdin = if let Some(stdin) = lock.stdin.as_mut() { + /// Restarts service process + #[inline] + pub fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> { + self.stop()?; + self.start()?; + Ok(()) + } + /// Writes to the service process' stdin, if it exists. + pub fn write_stdin(&mut self, buf: &str) -> Result<(), Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.as_mut() { + let stdin = if let Some(stdin) = proc.stdin.as_mut() { stdin } else { return Err(Box::new(std::io::Error::new( @@ -290,8 +284,8 @@ impl Service { "No stdin handle associated with process", ))); }; - stdin.write(&buf.as_bytes()).await?; - stdin.flush().await?; + stdin.write(&buf.as_bytes())?; + stdin.flush()?; Ok(()) } else { Err(Box::new(std::io::Error::new( @@ -300,11 +294,9 @@ impl Service { ))) } } - /** - * Writes a line to the service process' stdin, if it exists. - */ + /// Writes a line to the service process' stdin, if it exists. #[inline] - pub async fn writeln_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> { - self.write_stdin(format!("{}\n", buf)).await + pub fn writeln_stdin(&mut self, buf: &str) -> Result<(), Box<dyn std::error::Error>> { + self.write_stdin(&format!("{}\n", buf)) } } |