From 2df86268298de8a4961c3296c19b0767565453e8 Mon Sep 17 00:00:00 2001 From: yuzu Date: Fri, 1 Aug 2025 08:48:17 +0000 Subject: unix socket get git-svn-id: svn+ssh://diminuette.aengel.lesbianunix.dev/salaryman/trunk@17 b9215c17-b818-4693-b096-d1e41a411fef --- src/server/main.rs | 249 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 231 insertions(+), 18 deletions(-) (limited to 'src/server') diff --git a/src/server/main.rs b/src/server/main.rs index b769fbf..61edd16 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -1,12 +1,15 @@ use clap::Parser; -use salaryman::service::{Service, ServiceConf, ServiceState}; +use rayon::prelude::*; +use salaryman::{Service, ServiceConf, ServiceState, SalarymanPacket}; use serde::{Deserialize, Serialize}; use std::{ - fs::read_to_string, - os::unix::net::{UnixListener, UnixStream}, + io::{Read, Write}, + fs::{File, read_to_string}, + os::unix::net::UnixListener, path::PathBuf, + sync::mpsc::{TryRecvError, channel}, }; -use rayon::prelude::*; +use uuid::Uuid; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -16,9 +19,9 @@ struct Args { long, value_name = "FILE", help = "config file override", - default_value = "salaryman.toml" + default_value = "None", )] - config: PathBuf, + config: Option, #[arg( short, long, @@ -29,8 +32,14 @@ struct Args { socket: PathBuf, } -pub enum ServiceReq { +pub enum InnerProtocol { Create(ServiceConf), + Delete(Uuid), + Start(Uuid), + Stop(Uuid), + Write((Uuid, String)), + Scan, + Quit, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -66,10 +75,50 @@ fn load_config(file: &PathBuf) -> Result> { } } +fn _save_config(file: &PathBuf, conf: &Config) -> Result<(), Box> { + let mut f = File::options().create(true).truncate(true).write(true).open(file)?; + f.write(toml::to_string(conf)?.as_bytes())?; + Ok(()) +} + +#[cfg(not(target_os = "windows"))] // We cannot build on Windows fn main() -> Result<(), Box> { let args = Args::parse(); - let conf: Config = load_config(&args.config)?; - let _sockaddr = if let Some(sock) = conf.socket { + + // Config Handling + let local_conf = PathBuf::from("salaryman.conf"); + let xdg_conf = PathBuf::from({ + if let Ok(xcd) = std::env::var("XDG_CONFIG_HOME") { + xcd + "/smd/salaryman.toml" + } else { + if let Ok(homedir) = std::env::var("HOME") { + homedir + "/.config/smd/salaryman.conf" + } else { + "/root/.config/smd/salaryman.conf".into() + } + } + }); + let sys_conf = PathBuf::from("/etc/salaryman/smd/salaryman.conf"); + let confpaths = if let Some(path) = args.config.clone() { + vec![path] + } else { + vec![local_conf, xdg_conf, sys_conf] + }; + let get_conf = |confpaths, p: Option| -> (PathBuf, Config) { + for cfile in confpaths { + if let Ok(conf) = load_config(&cfile) { + return (cfile.to_owned(), conf); + } + } + if let Some(cpath) = p { + (cpath.to_owned(), Config::new()) + } else { + (PathBuf::from("salaryman.toml"), Config::new()) + } + }; + let (config_path, conf) = get_conf(confpaths, args.config.clone()); + + let sockaddr = if let Some(sock) = conf.socket { sock } else { args.socket @@ -78,16 +127,180 @@ fn main() -> Result<(), Box> { for service in conf.service { services.push(service.build()?); } - loop { - services.par_iter_mut() - .for_each(|service| { - match service.state().expect("unable to get service state") { - ServiceState::Failed => service.restart().expect("unable to restart service"), - ServiceState::Stopped => (), + // event loop sender + let (etx, erx) = channel(); + // listener + let (ltx, lrx) = channel(); + // main services event loop + let (stx, srx) = channel(); + + // for event loop sender to main services event + let queue = stx.clone(); + let lkill = ltx.clone(); + std::thread::spawn(move || { + loop { // event loop sender + match erx.try_recv() { + Err(TryRecvError::Disconnected) | Ok(true) => { + queue.send(InnerProtocol::Quit).unwrap_or_default(); + lkill.send(true).unwrap_or_default(); + return; + }, + _ => (), + } + queue.send(InnerProtocol::Scan).unwrap_or_default(); + std::thread::sleep(std::time::Duration::from_millis(250)); + } + }); + + let ekill = etx.clone(); + let lkill = ltx.clone(); + std::thread::spawn(move || { + // Main Services Event Loop + loop { + match srx.try_recv() { + Err(TryRecvError::Disconnected) | Ok(InnerProtocol::Quit) => { + services.par_iter_mut() + .for_each(|service| { + if let Err(e) = service.stop() { + eprintln!("unable to start service {} with id {} due to error {e}", &service.name(), &service.uuid()); + } + }); + ekill.send(true).unwrap_or_default(); + lkill.send(true).unwrap_or_default(); + return; + }, + Ok(InnerProtocol::Create(conf)) => if let Ok(s) = conf.build() { services.push(s); }, + Ok(InnerProtocol::Start(u)) => { + services.par_iter_mut() + .filter(|s| s.uuid() == &u) + .for_each(|s| { + if !s.started() { + if let Ok(_) = s.start() { + return; + } + } + }); + }, + Ok(InnerProtocol::Stop(u)) => { + services.par_iter_mut() + .filter(|s| s.uuid() == &u) + .for_each(|s| { + if s.started() { + if let Ok(_) = s.stop() { + return; + } + } + }); + }, + Ok(InnerProtocol::Write((u, buf))) => { + services.par_iter_mut() + .filter(|s| s.uuid() == &u) + .for_each(|s| { + if let Ok(_) = s.write_stdin(&buf) { + return; + } + }); + }, + Ok(InnerProtocol::Delete(u)) => { + if let Some(i) = services.par_iter_mut() + .position_first(|s| s.uuid() == &u) { + if let Err(e) = services[i].stop() { + eprintln!("unable to stop service {} with id {} due to error {e}", &services[i].name(), &services[i].uuid()); + }; + services.swap_remove(i); + } + }, + Ok(InnerProtocol::Scan) => { + services.par_iter_mut() + .for_each(|service| { + match service.state() { + ServiceState::Failed => { + if let Err(e) = service.restart() { + eprintln!("unable to restart service {} with id {} due to error {e}", &service.name(), &service.uuid()); + } + }, + _ => (), + } + }); + }, + _ => std::thread::sleep(std::time::Duration::from_millis(100)), + } + } + }); + let listen = UnixListener::bind(sockaddr)?; + let queue = stx.clone(); + let ekill = etx.clone(); + for stream in listen.incoming() { + match stream { + Ok(mut stream) => { + match lrx.try_recv() { + Err(TryRecvError::Disconnected) | Ok(true) => { + ekill.send(true).unwrap_or_default(); + queue.send(InnerProtocol::Quit).unwrap_or_default(); + return Ok(()); + }, + _ => (), + } + let squeue = queue.clone(); + std::thread::spawn(move || { + let mut buf = String::new(); + if let Ok(len) = stream.read_to_string(&mut buf) { + if len > 0 { + let resp = match SalarymanPacket::deserialize(&buf.as_bytes()) { + Ok(SalarymanPacket::Create(sc)) => { + squeue.send(InnerProtocol::Create(sc.to_owned())).unwrap_or_default(); + SalarymanPacket::response(&SalarymanPacket::Create(sc)) + }, + Ok(SalarymanPacket::Delete(u)) => { + squeue.send(InnerProtocol::Delete(u.to_owned())).unwrap_or_default(); + SalarymanPacket::response(&SalarymanPacket::Delete(u)) + }, + Ok(SalarymanPacket::Start(u)) => { + squeue.send(InnerProtocol::Start(u.to_owned())).unwrap_or_default(); + SalarymanPacket::response(&SalarymanPacket::Start(u)) + }, + Ok(SalarymanPacket::Stop(u)) => { + squeue.send(InnerProtocol::Stop(u.to_owned())).unwrap_or_default(); + SalarymanPacket::response(&SalarymanPacket::Stop(u)) + }, + Ok(SalarymanPacket::Restart(u)) => { + squeue.send(InnerProtocol::Stop(u.to_owned())).unwrap_or_default(); + squeue.send(InnerProtocol::Start(u.to_owned())).unwrap_or_default(); + SalarymanPacket::response(&SalarymanPacket::Restart(u)) + }, + Ok(SalarymanPacket::Quit) => { + squeue.send(InnerProtocol::Quit).unwrap_or_default(); + SalarymanPacket::response(&SalarymanPacket::Quit) + }, + Ok(b) => SalarymanPacket::response(&b), + Err(_) => SalarymanPacket::Invalid, + }; + let resp = if let Ok(i) = SalarymanPacket::serialize(&resp) { + i + } else { + Vec::new() + }; + if let Ok(len) = stream.write(&resp) { + if len > 0 { + stream.flush().unwrap_or_default(); + } + } + } + } + }); + }, + Err(_) => { + match lrx.try_recv() { + Err(TryRecvError::Disconnected) | Ok(true) => { + ekill.send(true).unwrap_or_default(); + queue.send(InnerProtocol::Quit).unwrap_or_default(); + return Ok(()); + }, _ => (), } - }); - services.push(Service::new()); - std::thread::sleep(std::time::Duration::from_millis(100)); + }, + } } + Ok(()) } + -- cgit 1.4.1-2-gfad0