about summary refs log tree commit diff stats
path: root/src/server
diff options
context:
space:
mode:
authoryuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef>2025-08-01 08:48:17 +0000
committeryuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef>2025-08-01 08:48:17 +0000
commit2df86268298de8a4961c3296c19b0767565453e8 (patch)
tree1f048c96efaeb0959e04ad4273a149a108491361 /src/server
parent40a65f981a664a186a52fc2981b99a6d8a1191d4 (diff)
downloadsalaryman-canon.tar.gz
salaryman-canon.tar.bz2
salaryman-canon.zip
unix socket get HEAD canon
git-svn-id: svn+ssh://diminuette.aengel.lesbianunix.dev/salaryman/trunk@17 b9215c17-b818-4693-b096-d1e41a411fef
Diffstat (limited to 'src/server')
-rw-r--r--src/server/main.rs249
1 files changed, 231 insertions, 18 deletions
diff --git a/src/server/main.rs b/src/server/main.rs
index b769fbf..61edd16 100644
--- a/src/server/main.rs
+++ b/src/server/main.rs
@@ -1,12 +1,15 @@
 use clap::Parser;
-use salaryman::service::{Service, ServiceConf, ServiceState};
+use rayon::prelude::*;
+use salaryman::{Service, ServiceConf, ServiceState, SalarymanPacket};
 use serde::{Deserialize, Serialize};
 use std::{
-    fs::read_to_string,
-    os::unix::net::{UnixListener, UnixStream},
+    io::{Read, Write},
+    fs::{File, read_to_string},
+    os::unix::net::UnixListener,
     path::PathBuf,
+    sync::mpsc::{TryRecvError, channel},
 };
-use rayon::prelude::*;
+use uuid::Uuid;
 
 #[derive(Parser, Debug)]
 #[command(version, about, long_about = None)]
@@ -16,9 +19,9 @@ struct Args {
         long,
         value_name = "FILE",
         help = "config file override",
-        default_value = "salaryman.toml"
+        default_value = "None",
     )]
-    config: PathBuf,
+    config: Option<PathBuf>,
     #[arg(
         short,
         long,
@@ -29,8 +32,14 @@ struct Args {
     socket: PathBuf,
 }
 
-pub enum ServiceReq {
+pub enum InnerProtocol {
     Create(ServiceConf),
+    Delete(Uuid),
+    Start(Uuid),
+    Stop(Uuid),
+    Write((Uuid, String)),
+    Scan,
+    Quit,
 }
 
 #[derive(Serialize, Deserialize, Clone, Debug)]
@@ -66,10 +75,50 @@ fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error>> {
     }
 }
 
+fn _save_config(file: &PathBuf, conf: &Config) -> Result<(), Box<dyn std::error::Error>> {
+    let mut f = File::options().create(true).truncate(true).write(true).open(file)?;
+    f.write(toml::to_string(conf)?.as_bytes())?;
+    Ok(())
+}
+
+#[cfg(not(target_os = "windows"))] // We cannot build on Windows
 fn main() -> Result<(), Box<dyn std::error::Error>> {
     let args = Args::parse();
-    let conf: Config = load_config(&args.config)?;
-    let _sockaddr = if let Some(sock) = conf.socket {
+
+    // Config Handling
+    let local_conf = PathBuf::from("salaryman.conf");
+    let xdg_conf = PathBuf::from({
+        if let Ok(xcd) = std::env::var("XDG_CONFIG_HOME") {
+            xcd + "/smd/salaryman.toml"
+        } else {
+            if let Ok(homedir) = std::env::var("HOME") {
+                homedir + "/.config/smd/salaryman.conf"
+            } else {
+                "/root/.config/smd/salaryman.conf".into()
+            }
+        }
+    });
+    let sys_conf = PathBuf::from("/etc/salaryman/smd/salaryman.conf");
+    let confpaths = if let Some(path) = args.config.clone() {
+        vec![path]
+    } else {
+        vec![local_conf, xdg_conf, sys_conf]
+    };
+    let get_conf = |confpaths, p: Option<PathBuf>| -> (PathBuf, Config) {
+        for cfile in confpaths {
+            if let Ok(conf) = load_config(&cfile) {
+                return (cfile.to_owned(), conf);
+            }
+        }
+        if let Some(cpath) = p {
+            (cpath.to_owned(), Config::new())
+        } else {
+            (PathBuf::from("salaryman.toml"), Config::new())
+        }
+    };
+    let (config_path, conf) = get_conf(confpaths, args.config.clone());
+
+    let sockaddr = if let Some(sock) = conf.socket {
         sock
     } else {
         args.socket
@@ -78,16 +127,180 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
     for service in conf.service {
         services.push(service.build()?);
     }
-    loop {
-        services.par_iter_mut()
-            .for_each(|service| {
-                match service.state().expect("unable to get service state") {
-                    ServiceState::Failed => service.restart().expect("unable to restart service"),
-                    ServiceState::Stopped => (),
+    // event loop sender
+    let (etx, erx) = channel();
+    // listener
+    let (ltx, lrx) = channel();
+    // main services event loop
+    let (stx, srx) = channel();
+
+    // for event loop sender to main services event
+    let queue = stx.clone();
+    let lkill = ltx.clone();
+    std::thread::spawn(move || {
+        loop { // event loop sender
+            match erx.try_recv() {
+                Err(TryRecvError::Disconnected) | Ok(true) => {
+                    queue.send(InnerProtocol::Quit).unwrap_or_default();
+                    lkill.send(true).unwrap_or_default();
+                    return;
+                },
+                _ => (),
+            }
+            queue.send(InnerProtocol::Scan).unwrap_or_default();
+            std::thread::sleep(std::time::Duration::from_millis(250));
+        }
+    });
+
+    let ekill = etx.clone();
+    let lkill = ltx.clone();
+    std::thread::spawn(move || {
+        // Main Services Event Loop
+        loop {
+            match srx.try_recv() {
+                Err(TryRecvError::Disconnected) | Ok(InnerProtocol::Quit) => {
+                    services.par_iter_mut()
+                        .for_each(|service| {
+                            if let Err(e) = service.stop() {
+                                eprintln!("unable to start service {} with id {} due to error {e}", &service.name(), &service.uuid());
+                            }
+                        });
+                    ekill.send(true).unwrap_or_default();
+                    lkill.send(true).unwrap_or_default();
+                    return;
+                },
+                Ok(InnerProtocol::Create(conf)) => if let Ok(s) = conf.build() { services.push(s); },
+                Ok(InnerProtocol::Start(u)) => {
+                    services.par_iter_mut()
+                        .filter(|s| s.uuid() == &u)
+                        .for_each(|s| {
+                            if !s.started() {
+                                if let Ok(_) = s.start() {
+                                    return;
+                                }
+                            }
+                        });
+                },
+                Ok(InnerProtocol::Stop(u)) => {
+                    services.par_iter_mut()
+                        .filter(|s| s.uuid() == &u)
+                        .for_each(|s| {
+                            if s.started() {
+                                if let Ok(_) = s.stop() {
+                                    return;
+                                }
+                            }
+                        });
+                },
+                Ok(InnerProtocol::Write((u, buf))) => {
+                    services.par_iter_mut()
+                        .filter(|s| s.uuid() == &u)
+                        .for_each(|s| {
+                            if let Ok(_) = s.write_stdin(&buf) {
+                                return;
+                            }
+                        });
+                },
+                Ok(InnerProtocol::Delete(u)) => {
+                    if let Some(i) = services.par_iter_mut()
+                        .position_first(|s| s.uuid() == &u) {
+                        if let Err(e) = services[i].stop() {
+                            eprintln!("unable to stop service {} with id {} due to error {e}", &services[i].name(), &services[i].uuid());
+                        };
+                        services.swap_remove(i);
+                    }
+                },
+                Ok(InnerProtocol::Scan) => {
+                    services.par_iter_mut()
+                        .for_each(|service| {
+                            match service.state() {
+                                ServiceState::Failed => {
+                                    if let Err(e) = service.restart() {
+                                        eprintln!("unable to restart service {} with id {} due to error {e}", &service.name(), &service.uuid());
+                                    }
+                                },
+                                _ => (),
+                            }
+                        });
+                },
+                _ => std::thread::sleep(std::time::Duration::from_millis(100)),
+            }
+        }
+    });
+    let listen = UnixListener::bind(sockaddr)?;
+    let queue = stx.clone();
+    let ekill = etx.clone();
+    for stream in listen.incoming() {
+        match stream {
+            Ok(mut stream) => {
+                match lrx.try_recv() {
+                    Err(TryRecvError::Disconnected) | Ok(true) => {
+                        ekill.send(true).unwrap_or_default();
+                        queue.send(InnerProtocol::Quit).unwrap_or_default();
+                        return Ok(());
+                    },
+                    _ => (),
+                }
+                let squeue = queue.clone();
+                std::thread::spawn(move || {
+                    let mut buf = String::new();
+                    if let Ok(len) = stream.read_to_string(&mut buf) {
+                        if len > 0 {
+                            let resp = match SalarymanPacket::deserialize(&buf.as_bytes()) {
+                                Ok(SalarymanPacket::Create(sc)) => {
+                                    squeue.send(InnerProtocol::Create(sc.to_owned())).unwrap_or_default();
+                                    SalarymanPacket::response(&SalarymanPacket::Create(sc))
+                                },
+                                Ok(SalarymanPacket::Delete(u)) => {
+                                    squeue.send(InnerProtocol::Delete(u.to_owned())).unwrap_or_default();
+                                    SalarymanPacket::response(&SalarymanPacket::Delete(u))
+                                },
+                                Ok(SalarymanPacket::Start(u)) => {
+                                    squeue.send(InnerProtocol::Start(u.to_owned())).unwrap_or_default();
+                                    SalarymanPacket::response(&SalarymanPacket::Start(u))
+                                },
+                                Ok(SalarymanPacket::Stop(u)) => {
+                                    squeue.send(InnerProtocol::Stop(u.to_owned())).unwrap_or_default();
+                                    SalarymanPacket::response(&SalarymanPacket::Stop(u))
+                                },
+                                Ok(SalarymanPacket::Restart(u)) => {
+                                    squeue.send(InnerProtocol::Stop(u.to_owned())).unwrap_or_default();
+                                    squeue.send(InnerProtocol::Start(u.to_owned())).unwrap_or_default();
+                                    SalarymanPacket::response(&SalarymanPacket::Restart(u))
+                                },
+                                Ok(SalarymanPacket::Quit) => {
+                                    squeue.send(InnerProtocol::Quit).unwrap_or_default();
+                                    SalarymanPacket::response(&SalarymanPacket::Quit)
+                                },
+                                Ok(b) => SalarymanPacket::response(&b),
+                                Err(_) => SalarymanPacket::Invalid,
+                            };
+                            let resp = if let Ok(i) = SalarymanPacket::serialize(&resp) {
+                                i
+                            } else {
+                                Vec::new()
+                            };
+                            if let Ok(len) = stream.write(&resp) {
+                                if len > 0 {
+                                    stream.flush().unwrap_or_default();
+                                }
+                            }
+                        }
+                    }
+                });
+            },
+            Err(_) => {
+                match lrx.try_recv() {
+                    Err(TryRecvError::Disconnected) | Ok(true) => {
+                        ekill.send(true).unwrap_or_default();
+                        queue.send(InnerProtocol::Quit).unwrap_or_default();
+                        return Ok(());
+                    },
                     _ => (),
                 }
-            });
-        services.push(Service::new());
-        std::thread::sleep(std::time::Duration::from_millis(100));
+            },
+        }
     }
+    Ok(())
 }
+