diff options
author | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-08 01:49:14 +0000 |
---|---|---|
committer | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-08 01:49:14 +0000 |
commit | 8f05a437d80a243e504b4fb5d26b53bbd7de9c47 (patch) | |
tree | ea57f02f9b5e5a98db9028ca5ed2d2c828985ddd /src/model.rs | |
parent | 8880b0afd01bd9afb4a47a76d90e8b90e5cbff1e (diff) | |
download | salaryman-8f05a437d80a243e504b4fb5d26b53bbd7de9c47.tar.gz salaryman-8f05a437d80a243e504b4fb5d26b53bbd7de9c47.tar.bz2 salaryman-8f05a437d80a243e504b4fb5d26b53bbd7de9c47.zip |
go fully async
git-svn-id: svn+ssh://diminuette.aengel.lesbianunix.dev/salaryman/trunk@8 b9215c17-b818-4693-b096-d1e41a411fef
Diffstat (limited to 'src/model.rs')
-rw-r--r-- | src/model.rs | 276 |
1 files changed, 182 insertions, 94 deletions
diff --git a/src/model.rs b/src/model.rs index 52408fc..8f4416b 100644 --- a/src/model.rs +++ b/src/model.rs @@ -1,22 +1,33 @@ use serde::{Deserialize, Serialize}; - -use std::{ - fs::canonicalize, - io::{Read, BufRead, BufReader}, - path::PathBuf, - process::{Child, Command, Stdio}, - sync::{Arc, Mutex}, +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + process::{Child, Command}, + sync::{ + Mutex, + mpsc::{Receiver, channel}, + }, + task::spawn, }; +use std::{path::PathBuf, process::Stdio, sync::Arc}; + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct ServiceConf { pub name: String, - command: String, - args: Option<String>, - directory: Option<PathBuf>, + pub command: String, + pub args: Option<String>, + pub directory: Option<PathBuf>, pub autostart: bool, } +impl Default for ServiceConf { + fn default() -> Self { + Self::new() + } +} impl ServiceConf { + /** + * Returns a new empty `ServiceConf` + */ pub fn new() -> Self { Self { name: String::new(), @@ -26,6 +37,9 @@ impl ServiceConf { autostart: false, } } + /** + * Returns a new `ServiceConf` from parts. + */ pub fn from_parts( name: String, command: String, @@ -45,126 +59,200 @@ impl ServiceConf { #[derive(Debug)] pub struct Service { - pub config: ServiceConf, - process: Option<Mutex<Child>>, + conf: ServiceConf, + proc: Option<Arc<Mutex<Child>>>, + stdout: Option<Arc<Mutex<Receiver<String>>>>, + stderr: Option<Arc<Mutex<Receiver<String>>>>, +} +impl Default for Service { + fn default() -> Self { + Self::new() + } } impl Service { + /** + * Returns a new empty `Service` + */ pub fn new() -> Self { Self { - config: ServiceConf::new(), - process: None, + conf: ServiceConf::default(), + proc: None, + stdout: None, + stderr: None, } } - pub fn from_conf(config: ServiceConf) -> Self { + /** + * Returns a `Service` made from a `ServiceConf`. + */ + pub fn from_conf(conf: &ServiceConf) -> Self { Self { - config, - process: None, + conf: conf.clone(), + proc: None, + stdout: None, + stderr: None, } } - pub fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> { - if self.process.is_some() { + /** + * Returns the name of the service + */ + pub async fn name(&self) -> &str { + &self.conf.name + } + /** + * Uses `tokio::process::Command` to start the service. + */ + pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> { + if self.proc.is_some() { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::AlreadyExists, - "process already exists!", + "Process Already Exists", ))); } - let command: &str = &self.config.command.as_str(); - let args: Vec<String> = if let Some(a) = &self.config.args { - let mut v: Vec<String> = Vec::new(); - for arg in a.split_whitespace() { - v.push(String::from(arg)); - } - v + let cmd = &self.conf.command; + let args = if let Some(a) = &self.conf.args { + a.split_whitespace() } else { - Vec::new() + "".split_whitespace() }; - if let Some(cwd) = &self.config.directory { - let child = Command::new(command) - .args(args) - .current_dir(canonicalize(cwd)?) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - self.process = Some(Mutex::new(child)); + let cwd = if let Some(c) = &self.conf.directory { + c } else { - let child = Command::new(command) - .args(args) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - self.process = Some(Mutex::new(child)); + &PathBuf::from("/") }; + let child = Command::new(cmd) + .args(args) + .current_dir(cwd) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + self.proc = Some(Arc::new(Mutex::new(child))); Ok(()) } - pub fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> { - if let Some(process) = &self.process { - if let Ok(mut process) = process.lock() { - process.kill()?; - } else { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::ResourceBusy, - "cannot acquire lock", - ))) - } + /** + * Returns true when process is started and false when process is stopped. + */ + pub async fn started(&self) -> bool { + self.proc.is_some() + } + /** + * Invokes kill on the service process + */ + pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.clone() { + let mut lock = proc.lock().await; + lock.kill().await?; + drop(lock); + self.proc = None; + Ok(()) } else { - return Err(Box::new(std::io::Error::new( + Err(Box::new(std::io::Error::new( std::io::ErrorKind::NotFound, - "process already exists!", - ))); - }; - self.process = None; + "No Process Associated with Service", + ))) + } + } + /** + * Restarts service process + */ + pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> { + self.stop().await?; + self.start().await?; Ok(()) } - //TODO: this function needs to fork and do message passing via mpsc channels. - pub fn stdout(&mut self) -> Result<String, Box<dyn std::error::Error>> { - if let Some(process) = &mut self.process { - if let Ok(mut process) = process.lock() { - if let Some(stdout) = process.stdout.as_mut() { - let reader = BufReader::new(stdout); - Ok(String::from(reader.lines().filter_map(|line| line.ok()).collect::<String>())) - } else { - Err(Box::new(std::io::Error::new( - std::io::ErrorKind::ResourceBusy, - "cannot acquire stdout", - ))) - } + /** + * Takes control of service process' stdout file handle and spawns a new task to continuously + * scan it. + */ + pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.clone() { + let mut lock = proc.lock().await; + let stdout = if let Some(stdout) = lock.stdout.take() { + stdout } else { - Err(Box::new(std::io::Error::new( - std::io::ErrorKind::ResourceBusy, - "cannot acquire lock", - ))) - } + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + "No stdout handle associated with process", + ))); + }; + drop(lock); + let (tx, rx) = channel(100); + let sname = self.conf.name.clone(); + spawn(async move { + let mut br = BufReader::new(stdout).lines(); + while let Ok(Some(line)) = br.next_line().await { + println!("{} :: {}", &sname, &line); + if let Err(_) = tx.send(line).await { + return; + }; + } + }); + self.stdout = Some(Arc::new(Mutex::new(rx))); + Ok(()) } else { Err(Box::new(std::io::Error::new( std::io::ErrorKind::NotFound, - "is the process started?", + "No Process Associated with Service", ))) } } - //TODO: this function needs to fork and do message passing via mpsc channels. - //TODO: this function needs to use a bufreader instead - pub fn stderr(&mut self) -> Result<String, Box<dyn std::error::Error>> { - if let Some(process) = &mut self.process { - if let Ok(mut process) = process.lock() { - let mut s = String::new(); - if let Some(stderr) = process.stderr.as_mut() { - stderr.read_to_string(&mut s)?; + /** + * Takes control of service process' stderr file handle and spawns a new task to continuously + * scan it. + */ + pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.clone() { + let mut lock = proc.lock().await; + let stderr = if let Some(stderr) = lock.stderr.take() { + stderr + } else { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + "No stderr handle associated with process", + ))); + }; + drop(lock); + let (tx, rx) = channel(100); + let sname = self.conf.name.clone(); + spawn(async move { + let mut br = BufReader::new(stderr).lines(); + while let Ok(Some(line)) = br.next_line().await { + println!("ERR :: {} :: {}", &sname, &line); + if let Err(_) = tx.send(line).await { + return; + }; } - Ok(s) + }); + self.stderr = Some(Arc::new(Mutex::new(rx))); + Ok(()) + } else { + Err(Box::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + "No Process Associated with Service", + ))) + } + } + /** + * Writes to the service process' stdin, if it exists. + */ + pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.clone() { + let mut lock = proc.lock().await; + let stdin = if let Some(stdin) = lock.stdin.as_mut() { + stdin } else { - Err(Box::new(std::io::Error::new( - std::io::ErrorKind::ResourceBusy, - "cannot acquire lock", - ))) - } + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + "No stdin handle associated with process", + ))); + }; + stdin.write(&buf.as_bytes()).await?; + Ok(()) } else { Err(Box::new(std::io::Error::new( std::io::ErrorKind::NotFound, - "is the process started?", + "No Process Associated with Service", ))) } } } - |