diff options
author | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-08-01 08:48:17 +0000 |
---|---|---|
committer | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-08-01 08:48:17 +0000 |
commit | 2df86268298de8a4961c3296c19b0767565453e8 (patch) | |
tree | 1f048c96efaeb0959e04ad4273a149a108491361 /src | |
parent | 40a65f981a664a186a52fc2981b99a6d8a1191d4 (diff) | |
download | salaryman-2df86268298de8a4961c3296c19b0767565453e8.tar.gz salaryman-2df86268298de8a4961c3296c19b0767565453e8.tar.bz2 salaryman-2df86268298de8a4961c3296c19b0767565453e8.zip |
git-svn-id: svn+ssh://diminuette.aengel.lesbianunix.dev/salaryman/trunk@17 b9215c17-b818-4693-b096-d1e41a411fef
Diffstat (limited to 'src')
-rw-r--r-- | src/lib.rs | 8 | ||||
-rw-r--r-- | src/protocol.rs | 42 | ||||
-rw-r--r-- | src/server/main.rs | 249 | ||||
-rw-r--r-- | src/service.rs | 43 |
4 files changed, 320 insertions, 22 deletions
diff --git a/src/lib.rs b/src/lib.rs index 1f278a4..a0858b5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,9 @@ pub mod service; +#[cfg(feature = "protocol")] +pub mod protocol; + +// re-exports +pub use self::service::{Service, ServiceConf, ServiceState}; +#[cfg(feature = "protocol")] +pub use self::protocol::SalarymanPacket; + diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..a252b9a --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,42 @@ +use serde::{Serialize, Deserialize}; +use uuid::Uuid; +use super::service::ServiceConf; + +#[derive(Serialize, Deserialize)] +pub enum SalarymanPacket { + Ping, + Pong, + Okay, + Create(ServiceConf), + Delete(Uuid), + Start(Uuid), + Stop(Uuid), + Restart(Uuid), + + Quit, + Invalid, +} +impl SalarymanPacket { + /// Used to encode the enum and any nested structs for network transmission + pub fn serialize(&self) -> Result<Vec<u8>, bincode::error::EncodeError>{ + bincode::serde::encode_to_vec(&self, bincode::config::standard()) + } + /// Used to decode the enum and any nested struct from a network transmission + pub fn deserialize(data: &[u8]) -> Result<SalarymanPacket, bincode::error::DecodeError> { + let (ret, _size) = bincode::serde::decode_from_slice(data, bincode::config::standard())?; + Ok(ret) + } + /// Generates a response from a given SalarymanPacket invariant + pub fn response(&self) -> Self { + match self { + Self::Ping => Self::Pong, + Self::Create(_) => Self::Okay, + Self::Delete(_) => Self::Okay, + Self::Start(_) => Self::Okay, + Self::Stop(_) => Self::Okay, + Self::Restart(_) => Self::Okay, + Self::Quit => Self::Okay, + _ => Self::Invalid, + } + } +} diff --git a/src/server/main.rs b/src/server/main.rs index b769fbf..61edd16 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -1,12 +1,15 @@ use clap::Parser; -use salaryman::service::{Service, ServiceConf, ServiceState}; +use rayon::prelude::*; +use salaryman::{Service, ServiceConf, ServiceState, SalarymanPacket}; use serde::{Deserialize, Serialize}; use std::{ - fs::read_to_string, - os::unix::net::{UnixListener, UnixStream}, + io::{Read, Write}, + fs::{File, read_to_string}, + os::unix::net::UnixListener, path::PathBuf, + sync::mpsc::{TryRecvError, channel}, }; -use rayon::prelude::*; +use uuid::Uuid; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -16,9 +19,9 @@ struct Args { long, value_name = "FILE", help = "config file override", - default_value = "salaryman.toml" + default_value = "None", )] - config: PathBuf, + config: Option<PathBuf>, #[arg( short, long, @@ -29,8 +32,14 @@ struct Args { socket: PathBuf, } -pub enum ServiceReq { +pub enum InnerProtocol { Create(ServiceConf), + Delete(Uuid), + Start(Uuid), + Stop(Uuid), + Write((Uuid, String)), + Scan, + Quit, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -66,10 +75,50 @@ fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error>> { } } +fn _save_config(file: &PathBuf, conf: &Config) -> Result<(), Box<dyn std::error::Error>> { + let mut f = File::options().create(true).truncate(true).write(true).open(file)?; + f.write(toml::to_string(conf)?.as_bytes())?; + Ok(()) +} + +#[cfg(not(target_os = "windows"))] // We cannot build on Windows fn main() -> Result<(), Box<dyn std::error::Error>> { let args = Args::parse(); - let conf: Config = load_config(&args.config)?; - let _sockaddr = if let Some(sock) = conf.socket { + + // Config Handling + let local_conf = PathBuf::from("salaryman.conf"); + let xdg_conf = PathBuf::from({ + if let Ok(xcd) = std::env::var("XDG_CONFIG_HOME") { + xcd + "/smd/salaryman.toml" + } else { + if let Ok(homedir) = std::env::var("HOME") { + homedir + "/.config/smd/salaryman.conf" + } else { + "/root/.config/smd/salaryman.conf".into() + } + } + }); + let sys_conf = PathBuf::from("/etc/salaryman/smd/salaryman.conf"); + let confpaths = if let Some(path) = args.config.clone() { + vec![path] + } else { + vec![local_conf, xdg_conf, sys_conf] + }; + let get_conf = |confpaths, p: Option<PathBuf>| -> (PathBuf, Config) { + for cfile in confpaths { + if let Ok(conf) = load_config(&cfile) { + return (cfile.to_owned(), conf); + } + } + if let Some(cpath) = p { + (cpath.to_owned(), Config::new()) + } else { + (PathBuf::from("salaryman.toml"), Config::new()) + } + }; + let (config_path, conf) = get_conf(confpaths, args.config.clone()); + + let sockaddr = if let Some(sock) = conf.socket { sock } else { args.socket @@ -78,16 +127,180 @@ fn main() -> Result<(), Box<dyn std::error::Error>> { for service in conf.service { services.push(service.build()?); } - loop { - services.par_iter_mut() - .for_each(|service| { - match service.state().expect("unable to get service state") { - ServiceState::Failed => service.restart().expect("unable to restart service"), - ServiceState::Stopped => (), + // event loop sender + let (etx, erx) = channel(); + // listener + let (ltx, lrx) = channel(); + // main services event loop + let (stx, srx) = channel(); + + // for event loop sender to main services event + let queue = stx.clone(); + let lkill = ltx.clone(); + std::thread::spawn(move || { + loop { // event loop sender + match erx.try_recv() { + Err(TryRecvError::Disconnected) | Ok(true) => { + queue.send(InnerProtocol::Quit).unwrap_or_default(); + lkill.send(true).unwrap_or_default(); + return; + }, + _ => (), + } + queue.send(InnerProtocol::Scan).unwrap_or_default(); + std::thread::sleep(std::time::Duration::from_millis(250)); + } + }); + + let ekill = etx.clone(); + let lkill = ltx.clone(); + std::thread::spawn(move || { + // Main Services Event Loop + loop { + match srx.try_recv() { + Err(TryRecvError::Disconnected) | Ok(InnerProtocol::Quit) => { + services.par_iter_mut() + .for_each(|service| { + if let Err(e) = service.stop() { + eprintln!("unable to start service {} with id {} due to error {e}", &service.name(), &service.uuid()); + } + }); + ekill.send(true).unwrap_or_default(); + lkill.send(true).unwrap_or_default(); + return; + }, + Ok(InnerProtocol::Create(conf)) => if let Ok(s) = conf.build() { services.push(s); }, + Ok(InnerProtocol::Start(u)) => { + services.par_iter_mut() + .filter(|s| s.uuid() == &u) + .for_each(|s| { + if !s.started() { + if let Ok(_) = s.start() { + return; + } + } + }); + }, + Ok(InnerProtocol::Stop(u)) => { + services.par_iter_mut() + .filter(|s| s.uuid() == &u) + .for_each(|s| { + if s.started() { + if let Ok(_) = s.stop() { + return; + } + } + }); + }, + Ok(InnerProtocol::Write((u, buf))) => { + services.par_iter_mut() + .filter(|s| s.uuid() == &u) + .for_each(|s| { + if let Ok(_) = s.write_stdin(&buf) { + return; + } + }); + }, + Ok(InnerProtocol::Delete(u)) => { + if let Some(i) = services.par_iter_mut() + .position_first(|s| s.uuid() == &u) { + if let Err(e) = services[i].stop() { + eprintln!("unable to stop service {} with id {} due to error {e}", &services[i].name(), &services[i].uuid()); + }; + services.swap_remove(i); + } + }, + Ok(InnerProtocol::Scan) => { + services.par_iter_mut() + .for_each(|service| { + match service.state() { + ServiceState::Failed => { + if let Err(e) = service.restart() { + eprintln!("unable to restart service {} with id {} due to error {e}", &service.name(), &service.uuid()); + } + }, + _ => (), + } + }); + }, + _ => std::thread::sleep(std::time::Duration::from_millis(100)), + } + } + }); + let listen = UnixListener::bind(sockaddr)?; + let queue = stx.clone(); + let ekill = etx.clone(); + for stream in listen.incoming() { + match stream { + Ok(mut stream) => { + match lrx.try_recv() { + Err(TryRecvError::Disconnected) | Ok(true) => { + ekill.send(true).unwrap_or_default(); + queue.send(InnerProtocol::Quit).unwrap_or_default(); + return Ok(()); + }, + _ => (), + } + let squeue = queue.clone(); + std::thread::spawn(move || { + let mut buf = String::new(); + if let Ok(len) = stream.read_to_string(&mut buf) { + if len > 0 { + let resp = match SalarymanPacket::deserialize(&buf.as_bytes()) { + Ok(SalarymanPacket::Create(sc)) => { + squeue.send(InnerProtocol::Create(sc.to_owned())).unwrap_or_default(); + SalarymanPacket::response(&SalarymanPacket::Create(sc)) + }, + Ok(SalarymanPacket::Delete(u)) => { + squeue.send(InnerProtocol::Delete(u.to_owned())).unwrap_or_default(); + SalarymanPacket::response(&SalarymanPacket::Delete(u)) + }, + Ok(SalarymanPacket::Start(u)) => { + squeue.send(InnerProtocol::Start(u.to_owned())).unwrap_or_default(); + SalarymanPacket::response(&SalarymanPacket::Start(u)) + }, + Ok(SalarymanPacket::Stop(u)) => { + squeue.send(InnerProtocol::Stop(u.to_owned())).unwrap_or_default(); + SalarymanPacket::response(&SalarymanPacket::Stop(u)) + }, + Ok(SalarymanPacket::Restart(u)) => { + squeue.send(InnerProtocol::Stop(u.to_owned())).unwrap_or_default(); + squeue.send(InnerProtocol::Start(u.to_owned())).unwrap_or_default(); + SalarymanPacket::response(&SalarymanPacket::Restart(u)) + }, + Ok(SalarymanPacket::Quit) => { + squeue.send(InnerProtocol::Quit).unwrap_or_default(); + SalarymanPacket::response(&SalarymanPacket::Quit) + }, + Ok(b) => SalarymanPacket::response(&b), + Err(_) => SalarymanPacket::Invalid, + }; + let resp = if let Ok(i) = SalarymanPacket::serialize(&resp) { + i + } else { + Vec::new() + }; + if let Ok(len) = stream.write(&resp) { + if len > 0 { + stream.flush().unwrap_or_default(); + } + } + } + } + }); + }, + Err(_) => { + match lrx.try_recv() { + Err(TryRecvError::Disconnected) | Ok(true) => { + ekill.send(true).unwrap_or_default(); + queue.send(InnerProtocol::Quit).unwrap_or_default(); + return Ok(()); + }, _ => (), } - }); - services.push(Service::new()); - std::thread::sleep(std::time::Duration::from_millis(100)); + }, + } } + Ok(()) } + diff --git a/src/service.rs b/src/service.rs index 28e4401..97dc359 100644 --- a/src/service.rs +++ b/src/service.rs @@ -21,6 +21,7 @@ pub struct ServiceConf { args: Option<String>, directory: Option<PathBuf>, autostart: bool, + oneshot: Option<bool>, } impl Default for ServiceConf { fn default() -> Self { @@ -37,6 +38,7 @@ impl ServiceConf { args: None, directory: None, autostart: false, + oneshot: None, } } /// Returns a new `ServiceConf` from parts. @@ -47,6 +49,7 @@ impl ServiceConf { args: Option<String>, directory: Option<PathBuf>, autostart: bool, + oneshot: Option<bool>, ) -> Self { Self { uuid, @@ -55,6 +58,7 @@ impl ServiceConf { args, directory, autostart, + oneshot, } } /// Returns a new `ServiceConf` from parts with new uuid. @@ -64,6 +68,7 @@ impl ServiceConf { args: Option<String>, directory: Option<PathBuf>, autostart: bool, + oneshot: Option<bool>, ) -> Self { Self { uuid: Uuid::new_v4(), @@ -72,6 +77,7 @@ impl ServiceConf { args, directory, autostart, + oneshot, } } /// Returns the `uuid::Uuid` associated with the service config @@ -98,6 +104,10 @@ impl ServiceConf { pub fn get_autostart(&self) -> bool { self.autostart } + /// Returns the oneshot value of the described service + pub fn get_oneshot(&self) -> &Option<bool> { + &self.oneshot + } /// Sets the name of the described service pub fn name(&mut self, name: &str) -> &mut Self { self.name = String::from(name); @@ -123,6 +133,15 @@ impl ServiceConf { self.autostart = autostart; self } + /// Sets the oneshot flag for the described service + pub fn oneshot(&mut self, oneshot: bool) -> &mut Self { + if oneshot { + self.oneshot = Some(true); + } else { + self.oneshot = None; + } + self + } /// Builds a Service from this object #[inline] pub fn build(&self) -> Result<Service, Box<dyn std::error::Error>> { @@ -175,6 +194,15 @@ impl<'a> Service { pub fn name(&self) -> &str { &self.config().get_name() } + /// Returns the uuid of the service, shorthand for Service::config().get_uuid() + #[inline] + pub fn uuid(&self) -> &Uuid { + &self.config().get_uuid() + } + #[inline] + pub fn oneshot(&self) -> &Option<bool> { + &self.config().get_oneshot() + } #[inline] fn create_dirs(&self) -> Result<(), Box<dyn std::error::Error>> { match std::fs::create_dir("./logs") { @@ -243,14 +271,21 @@ impl<'a> Service { } } /// Returns the state of the service - pub fn state(&mut self) -> Result<ServiceState, Box<dyn std::error::Error>> { + pub fn state(&mut self) -> ServiceState { if let Some(proc) = self.proc.as_mut() { match proc.try_wait() { - Err(_) | Ok(Some(_)) => Ok(ServiceState::Failed), - Ok(None) => Ok(ServiceState::Running), + Err(_) | Ok(Some(_)) => { + if let Some(b) = self.oneshot() { + if *b { + return ServiceState::Stopped; + } + } + ServiceState::Failed + }, + Ok(None) => ServiceState::Running, } } else { - Ok(ServiceState::Stopped) + ServiceState::Stopped } } /// Invokes kill on the service process |