diff options
author | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-08 01:49:14 +0000 |
---|---|---|
committer | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-08 01:49:14 +0000 |
commit | 8f05a437d80a243e504b4fb5d26b53bbd7de9c47 (patch) | |
tree | ea57f02f9b5e5a98db9028ca5ed2d2c828985ddd | |
parent | 8880b0afd01bd9afb4a47a76d90e8b90e5cbff1e (diff) | |
download | salaryman-8f05a437d80a243e504b4fb5d26b53bbd7de9c47.tar.gz salaryman-8f05a437d80a243e504b4fb5d26b53bbd7de9c47.tar.bz2 salaryman-8f05a437d80a243e504b4fb5d26b53bbd7de9c47.zip |
go fully async
git-svn-id: svn+ssh://diminuette.aengel.lesbianunix.dev/salaryman/trunk@8 b9215c17-b818-4693-b096-d1e41a411fef
-rw-r--r-- | salaryman.toml | 13 | ||||
-rw-r--r-- | src/lib.rs | 1 | ||||
-rw-r--r-- | src/main.rs | 47 | ||||
-rw-r--r-- | src/model.rs | 276 |
4 files changed, 200 insertions, 137 deletions
diff --git a/salaryman.toml b/salaryman.toml index b0d600b..91cfe9b 100644 --- a/salaryman.toml +++ b/salaryman.toml @@ -2,14 +2,9 @@ address = "0.0.0.0" port = 8080 [[service]] -name = "minecraft" -command = "java" -args = "-jar minecraft_server.jar nogui" -directory = "/home/yuzu/minecraft" +name = "cattest" +command = "cat" +args = "/var/log/messages" +directory = "/" autostart = true -[[service]] -name = "derpcraft" -command = "java" -args = "-jar minecraft_server.jar" -autostart = false diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..65880be --- /dev/null +++ b/src/lib.rs @@ -0,0 +1 @@ +pub mod model; diff --git a/src/main.rs b/src/main.rs index f4e4b1c..09a9376 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,11 @@ -pub mod model; - use clap::Parser; use serde::{Deserialize, Serialize}; use tokio::fs::read_to_string; -use std::{ - net::IpAddr, - path::PathBuf, - sync::Arc, -}; +use std::{net::IpAddr, path::PathBuf}; -use crate::model::{Service, ServiceConf}; +use salaryman::model::{Service, ServiceConf}; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -75,41 +69,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let conf: Config = load_config(&args.config).await; let mut services: Vec<Service> = Vec::new(); for i in 0..conf.service.len() { - services.push(Service::from_conf(conf.service[i].clone())); + services.push(Service::from_conf(&conf.service[i])); if conf.service[i].autostart { - services[i].start()?; + services[i].start().await?; } } + let mut outs: Vec<(String, tokio::sync::mpsc::Receiver<String>)> = Vec::new(); for i in 0..services.len() { - println!("loop -1"); - if let Ok(s) = services[i].stdout() { - for line in s.lines() { - println!("STDOUT :: [{}] :: {}", services[i].config.name, line); - } - } - if let Ok(s) = services[i].stderr() { - for line in s.lines() { - println!("STDERR :: [{}] :: {}", services[i].config.name, line); - } + if services[i].started().await { + outs.push((services[i].name().await, services[i].scan_stdout().await?)); } } - for e in 0..100 { - println!("loop {e}"); - for i in 0..services.len() { - if let Ok(s) = services[i].stdout() { - for line in s.lines() { - println!("STDOUT :: [{}] :: {}", services[i].config.name, line); - } - } - if let Ok(s) = services[i].stderr() { - for line in s.lines() { - println!("STDERR :: [{}] :: {}", services[i].config.name, line); - } + for _i in 0..100 { + for out in 0..outs.len() { + if let Some(s) = outs[out].1.recv().await { + println!("got line from {} :: {}", outs[out].0, s); } - }; + } } for mut service in services { - match service.stop() { + match service.stop().await { Ok(_) => println!("lol it was killed"), Err(_) => println!("it either didn't exist, or failed to kill"), } diff --git a/src/model.rs b/src/model.rs index 52408fc..8f4416b 100644 --- a/src/model.rs +++ b/src/model.rs @@ -1,22 +1,33 @@ use serde::{Deserialize, Serialize}; - -use std::{ - fs::canonicalize, - io::{Read, BufRead, BufReader}, - path::PathBuf, - process::{Child, Command, Stdio}, - sync::{Arc, Mutex}, +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + process::{Child, Command}, + sync::{ + Mutex, + mpsc::{Receiver, channel}, + }, + task::spawn, }; +use std::{path::PathBuf, process::Stdio, sync::Arc}; + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct ServiceConf { pub name: String, - command: String, - args: Option<String>, - directory: Option<PathBuf>, + pub command: String, + pub args: Option<String>, + pub directory: Option<PathBuf>, pub autostart: bool, } +impl Default for ServiceConf { + fn default() -> Self { + Self::new() + } +} impl ServiceConf { + /** + * Returns a new empty `ServiceConf` + */ pub fn new() -> Self { Self { name: String::new(), @@ -26,6 +37,9 @@ impl ServiceConf { autostart: false, } } + /** + * Returns a new `ServiceConf` from parts. + */ pub fn from_parts( name: String, command: String, @@ -45,126 +59,200 @@ impl ServiceConf { #[derive(Debug)] pub struct Service { - pub config: ServiceConf, - process: Option<Mutex<Child>>, + conf: ServiceConf, + proc: Option<Arc<Mutex<Child>>>, + stdout: Option<Arc<Mutex<Receiver<String>>>>, + stderr: Option<Arc<Mutex<Receiver<String>>>>, +} +impl Default for Service { + fn default() -> Self { + Self::new() + } } impl Service { + /** + * Returns a new empty `Service` + */ pub fn new() -> Self { Self { - config: ServiceConf::new(), - process: None, + conf: ServiceConf::default(), + proc: None, + stdout: None, + stderr: None, } } - pub fn from_conf(config: ServiceConf) -> Self { + /** + * Returns a `Service` made from a `ServiceConf`. + */ + pub fn from_conf(conf: &ServiceConf) -> Self { Self { - config, - process: None, + conf: conf.clone(), + proc: None, + stdout: None, + stderr: None, } } - pub fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> { - if self.process.is_some() { + /** + * Returns the name of the service + */ + pub async fn name(&self) -> &str { + &self.conf.name + } + /** + * Uses `tokio::process::Command` to start the service. + */ + pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> { + if self.proc.is_some() { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::AlreadyExists, - "process already exists!", + "Process Already Exists", ))); } - let command: &str = &self.config.command.as_str(); - let args: Vec<String> = if let Some(a) = &self.config.args { - let mut v: Vec<String> = Vec::new(); - for arg in a.split_whitespace() { - v.push(String::from(arg)); - } - v + let cmd = &self.conf.command; + let args = if let Some(a) = &self.conf.args { + a.split_whitespace() } else { - Vec::new() + "".split_whitespace() }; - if let Some(cwd) = &self.config.directory { - let child = Command::new(command) - .args(args) - .current_dir(canonicalize(cwd)?) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - self.process = Some(Mutex::new(child)); + let cwd = if let Some(c) = &self.conf.directory { + c } else { - let child = Command::new(command) - .args(args) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - self.process = Some(Mutex::new(child)); + &PathBuf::from("/") }; + let child = Command::new(cmd) + .args(args) + .current_dir(cwd) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + self.proc = Some(Arc::new(Mutex::new(child))); Ok(()) } - pub fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> { - if let Some(process) = &self.process { - if let Ok(mut process) = process.lock() { - process.kill()?; - } else { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::ResourceBusy, - "cannot acquire lock", - ))) - } + /** + * Returns true when process is started and false when process is stopped. + */ + pub async fn started(&self) -> bool { + self.proc.is_some() + } + /** + * Invokes kill on the service process + */ + pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.clone() { + let mut lock = proc.lock().await; + lock.kill().await?; + drop(lock); + self.proc = None; + Ok(()) } else { - return Err(Box::new(std::io::Error::new( + Err(Box::new(std::io::Error::new( std::io::ErrorKind::NotFound, - "process already exists!", - ))); - }; - self.process = None; + "No Process Associated with Service", + ))) + } + } + /** + * Restarts service process + */ + pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> { + self.stop().await?; + self.start().await?; Ok(()) } - //TODO: this function needs to fork and do message passing via mpsc channels. - pub fn stdout(&mut self) -> Result<String, Box<dyn std::error::Error>> { - if let Some(process) = &mut self.process { - if let Ok(mut process) = process.lock() { - if let Some(stdout) = process.stdout.as_mut() { - let reader = BufReader::new(stdout); - Ok(String::from(reader.lines().filter_map(|line| line.ok()).collect::<String>())) - } else { - Err(Box::new(std::io::Error::new( - std::io::ErrorKind::ResourceBusy, - "cannot acquire stdout", - ))) - } + /** + * Takes control of service process' stdout file handle and spawns a new task to continuously + * scan it. + */ + pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.clone() { + let mut lock = proc.lock().await; + let stdout = if let Some(stdout) = lock.stdout.take() { + stdout } else { - Err(Box::new(std::io::Error::new( - std::io::ErrorKind::ResourceBusy, - "cannot acquire lock", - ))) - } + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + "No stdout handle associated with process", + ))); + }; + drop(lock); + let (tx, rx) = channel(100); + let sname = self.conf.name.clone(); + spawn(async move { + let mut br = BufReader::new(stdout).lines(); + while let Ok(Some(line)) = br.next_line().await { + println!("{} :: {}", &sname, &line); + if let Err(_) = tx.send(line).await { + return; + }; + } + }); + self.stdout = Some(Arc::new(Mutex::new(rx))); + Ok(()) } else { Err(Box::new(std::io::Error::new( std::io::ErrorKind::NotFound, - "is the process started?", + "No Process Associated with Service", ))) } } - //TODO: this function needs to fork and do message passing via mpsc channels. - //TODO: this function needs to use a bufreader instead - pub fn stderr(&mut self) -> Result<String, Box<dyn std::error::Error>> { - if let Some(process) = &mut self.process { - if let Ok(mut process) = process.lock() { - let mut s = String::new(); - if let Some(stderr) = process.stderr.as_mut() { - stderr.read_to_string(&mut s)?; + /** + * Takes control of service process' stderr file handle and spawns a new task to continuously + * scan it. + */ + pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.clone() { + let mut lock = proc.lock().await; + let stderr = if let Some(stderr) = lock.stderr.take() { + stderr + } else { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + "No stderr handle associated with process", + ))); + }; + drop(lock); + let (tx, rx) = channel(100); + let sname = self.conf.name.clone(); + spawn(async move { + let mut br = BufReader::new(stderr).lines(); + while let Ok(Some(line)) = br.next_line().await { + println!("ERR :: {} :: {}", &sname, &line); + if let Err(_) = tx.send(line).await { + return; + }; } - Ok(s) + }); + self.stderr = Some(Arc::new(Mutex::new(rx))); + Ok(()) + } else { + Err(Box::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + "No Process Associated with Service", + ))) + } + } + /** + * Writes to the service process' stdin, if it exists. + */ + pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> { + if let Some(proc) = self.proc.clone() { + let mut lock = proc.lock().await; + let stdin = if let Some(stdin) = lock.stdin.as_mut() { + stdin } else { - Err(Box::new(std::io::Error::new( - std::io::ErrorKind::ResourceBusy, - "cannot acquire lock", - ))) - } + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + "No stdin handle associated with process", + ))); + }; + stdin.write(&buf.as_bytes()).await?; + Ok(()) } else { Err(Box::new(std::io::Error::new( std::io::ErrorKind::NotFound, - "is the process started?", + "No Process Associated with Service", ))) } } } - |