about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authoryuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef>2025-07-08 01:49:14 +0000
committeryuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef>2025-07-08 01:49:14 +0000
commit8f05a437d80a243e504b4fb5d26b53bbd7de9c47 (patch)
treeea57f02f9b5e5a98db9028ca5ed2d2c828985ddd
parent8880b0afd01bd9afb4a47a76d90e8b90e5cbff1e (diff)
downloadsalaryman-8f05a437d80a243e504b4fb5d26b53bbd7de9c47.tar.gz
salaryman-8f05a437d80a243e504b4fb5d26b53bbd7de9c47.tar.bz2
salaryman-8f05a437d80a243e504b4fb5d26b53bbd7de9c47.zip
go fully async
git-svn-id: svn+ssh://diminuette.aengel.lesbianunix.dev/salaryman/trunk@8 b9215c17-b818-4693-b096-d1e41a411fef
-rw-r--r--salaryman.toml13
-rw-r--r--src/lib.rs1
-rw-r--r--src/main.rs47
-rw-r--r--src/model.rs276
4 files changed, 200 insertions, 137 deletions
diff --git a/salaryman.toml b/salaryman.toml
index b0d600b..91cfe9b 100644
--- a/salaryman.toml
+++ b/salaryman.toml
@@ -2,14 +2,9 @@ address = "0.0.0.0"
 port = 8080
 
 [[service]]
-name = "minecraft"
-command = "java"
-args = "-jar minecraft_server.jar nogui"
-directory = "/home/yuzu/minecraft"
+name = "cattest"
+command = "cat"
+args = "/var/log/messages"
+directory = "/"
 autostart = true
 
-[[service]]
-name = "derpcraft"
-command = "java"
-args = "-jar minecraft_server.jar"
-autostart = false
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..65880be
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1 @@
+pub mod model;
diff --git a/src/main.rs b/src/main.rs
index f4e4b1c..09a9376 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,17 +1,11 @@
-pub mod model;
-
 use clap::Parser;
 use serde::{Deserialize, Serialize};
 
 use tokio::fs::read_to_string;
 
-use std::{
-    net::IpAddr,
-    path::PathBuf,
-    sync::Arc,
-};
+use std::{net::IpAddr, path::PathBuf};
 
-use crate::model::{Service, ServiceConf};
+use salaryman::model::{Service, ServiceConf};
 
 #[derive(Parser, Debug)]
 #[command(version, about, long_about = None)]
@@ -75,41 +69,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let conf: Config = load_config(&args.config).await;
     let mut services: Vec<Service> = Vec::new();
     for i in 0..conf.service.len() {
-        services.push(Service::from_conf(conf.service[i].clone()));
+        services.push(Service::from_conf(&conf.service[i]));
         if conf.service[i].autostart {
-            services[i].start()?;
+            services[i].start().await?;
         }
     }
+    let mut outs: Vec<(String, tokio::sync::mpsc::Receiver<String>)> = Vec::new();
     for i in 0..services.len() {
-        println!("loop -1");
-        if let Ok(s) = services[i].stdout() {
-            for line in s.lines() {
-                println!("STDOUT :: [{}] :: {}", services[i].config.name, line);
-            }
-        }
-        if let Ok(s) = services[i].stderr() {
-            for line in s.lines() {
-                println!("STDERR :: [{}] :: {}", services[i].config.name, line);
-            }
+        if services[i].started().await {
+            outs.push((services[i].name().await, services[i].scan_stdout().await?));
         }
     }
-    for e in 0..100 {
-        println!("loop {e}");
-        for i in 0..services.len() {
-            if let Ok(s) = services[i].stdout() {
-                for line in s.lines() {
-                    println!("STDOUT :: [{}] :: {}", services[i].config.name, line);
-                }
-            }
-            if let Ok(s) = services[i].stderr() {
-                for line in s.lines() {
-                    println!("STDERR :: [{}] :: {}", services[i].config.name, line);
-                }
+    for _i in 0..100 {
+        for out in 0..outs.len() {
+            if let Some(s) = outs[out].1.recv().await {
+                println!("got line from {} :: {}", outs[out].0, s);
             }
-        };
+        }
     }
     for mut service in services {
-        match service.stop() {
+        match service.stop().await {
             Ok(_) => println!("lol it was killed"),
             Err(_) => println!("it either didn't exist, or failed to kill"),
         }
diff --git a/src/model.rs b/src/model.rs
index 52408fc..8f4416b 100644
--- a/src/model.rs
+++ b/src/model.rs
@@ -1,22 +1,33 @@
 use serde::{Deserialize, Serialize};
-
-use std::{
-    fs::canonicalize,
-    io::{Read, BufRead, BufReader},
-    path::PathBuf,
-    process::{Child, Command, Stdio},
-    sync::{Arc, Mutex},
+use tokio::{
+    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
+    process::{Child, Command},
+    sync::{
+        Mutex,
+        mpsc::{Receiver, channel},
+    },
+    task::spawn,
 };
 
+use std::{path::PathBuf, process::Stdio, sync::Arc};
+
 #[derive(Serialize, Deserialize, Clone, Debug)]
 pub struct ServiceConf {
     pub name: String,
-    command: String,
-    args: Option<String>,
-    directory: Option<PathBuf>,
+    pub command: String,
+    pub args: Option<String>,
+    pub directory: Option<PathBuf>,
     pub autostart: bool,
 }
+impl Default for ServiceConf {
+    fn default() -> Self {
+        Self::new()
+    }
+}
 impl ServiceConf {
+    /**
+     *  Returns a new empty `ServiceConf`
+     */
     pub fn new() -> Self {
         Self {
             name: String::new(),
@@ -26,6 +37,9 @@ impl ServiceConf {
             autostart: false,
         }
     }
+    /**
+     *  Returns a new `ServiceConf` from parts.
+     */
     pub fn from_parts(
         name: String,
         command: String,
@@ -45,126 +59,200 @@ impl ServiceConf {
 
 #[derive(Debug)]
 pub struct Service {
-    pub config: ServiceConf,
-    process: Option<Mutex<Child>>,
+    conf: ServiceConf,
+    proc: Option<Arc<Mutex<Child>>>,
+    stdout: Option<Arc<Mutex<Receiver<String>>>>,
+    stderr: Option<Arc<Mutex<Receiver<String>>>>,
+}
+impl Default for Service {
+    fn default() -> Self {
+        Self::new()
+    }
 }
 impl Service {
+    /**
+     *  Returns a new empty `Service`
+     */
     pub fn new() -> Self {
         Self {
-            config: ServiceConf::new(),
-            process: None,
+            conf: ServiceConf::default(),
+            proc: None,
+            stdout: None,
+            stderr: None,
         }
     }
-    pub fn from_conf(config: ServiceConf) -> Self {
+    /**
+     *  Returns a `Service` made from a `ServiceConf`.
+     */
+    pub fn from_conf(conf: &ServiceConf) -> Self {
         Self {
-            config,
-            process: None,
+            conf: conf.clone(),
+            proc: None,
+            stdout: None,
+            stderr: None,
         }
     }
-    pub fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
-        if self.process.is_some() {
+    /**
+     *  Returns the name of the service
+     */
+    pub async fn name(&self) -> &str {
+        &self.conf.name
+    }
+    /**
+     *  Uses `tokio::process::Command` to start the service.
+     */
+    pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+        if self.proc.is_some() {
             return Err(Box::new(std::io::Error::new(
                 std::io::ErrorKind::AlreadyExists,
-                "process already exists!",
+                "Process Already Exists",
             )));
         }
-        let command: &str = &self.config.command.as_str();
-        let args: Vec<String> = if let Some(a) = &self.config.args {
-            let mut v: Vec<String> = Vec::new();
-            for arg in a.split_whitespace() {
-                v.push(String::from(arg));
-            }
-            v
+        let cmd = &self.conf.command;
+        let args = if let Some(a) = &self.conf.args {
+            a.split_whitespace()
         } else {
-            Vec::new()
+            "".split_whitespace()
         };
-        if let Some(cwd) = &self.config.directory {
-            let child = Command::new(command)
-                .args(args)
-                .current_dir(canonicalize(cwd)?)
-                .stdin(Stdio::piped())
-                .stdout(Stdio::piped())
-                .stderr(Stdio::piped())
-                .spawn()?;
-            self.process = Some(Mutex::new(child));
+        let cwd = if let Some(c) = &self.conf.directory {
+            c
         } else {
-            let child = Command::new(command)
-                .args(args)
-                .stdin(Stdio::piped())
-                .stdout(Stdio::piped())
-                .stderr(Stdio::piped())
-                .spawn()?;
-            self.process = Some(Mutex::new(child));
+            &PathBuf::from("/")
         };
+        let child = Command::new(cmd)
+            .args(args)
+            .current_dir(cwd)
+            .stdin(Stdio::piped())
+            .stdout(Stdio::piped())
+            .stderr(Stdio::piped())
+            .spawn()?;
+        self.proc = Some(Arc::new(Mutex::new(child)));
         Ok(())
     }
-    pub fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
-        if let Some(process) = &self.process {
-            if let Ok(mut process) = process.lock() {
-                process.kill()?;
-            } else {
-                return Err(Box::new(std::io::Error::new(
-                    std::io::ErrorKind::ResourceBusy,
-                    "cannot acquire lock",
-                )))
-            }
+    /**
+     *  Returns true when process is started and false when process is stopped.
+     */
+    pub async fn started(&self) -> bool {
+        self.proc.is_some()
+    }
+    /**
+     *  Invokes kill on the service process
+     */
+    pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+        if let Some(proc) = self.proc.clone() {
+            let mut lock = proc.lock().await;
+            lock.kill().await?;
+            drop(lock);
+            self.proc = None;
+            Ok(())
         } else {
-            return Err(Box::new(std::io::Error::new(
+            Err(Box::new(std::io::Error::new(
                 std::io::ErrorKind::NotFound,
-                "process already exists!",
-            )));
-        };
-        self.process = None;
+                "No Process Associated with Service",
+            )))
+        }
+    }
+    /**
+     *  Restarts service process
+     */
+    pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+        self.stop().await?;
+        self.start().await?;
         Ok(())
     }
-    //TODO: this function needs to fork and do message passing via mpsc channels.
-    pub fn stdout(&mut self) -> Result<String, Box<dyn std::error::Error>> {
-        if let Some(process) = &mut self.process {
-            if let Ok(mut process) = process.lock() {
-                if let Some(stdout) = process.stdout.as_mut() {
-                    let reader = BufReader::new(stdout);
-                    Ok(String::from(reader.lines().filter_map(|line| line.ok()).collect::<String>()))
-                } else {
-                    Err(Box::new(std::io::Error::new(
-                        std::io::ErrorKind::ResourceBusy,
-                        "cannot acquire stdout",
-                    )))
-                }
+    /**
+     *  Takes control of service process' stdout file handle and spawns a new task to continuously
+     *  scan it.
+     */
+    pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+        if let Some(proc) = self.proc.clone() {
+            let mut lock = proc.lock().await;
+            let stdout = if let Some(stdout) = lock.stdout.take() {
+                stdout
             } else {
-                Err(Box::new(std::io::Error::new(
-                    std::io::ErrorKind::ResourceBusy,
-                    "cannot acquire lock",
-                )))
-            }
+                return Err(Box::new(std::io::Error::new(
+                    std::io::ErrorKind::NotFound,
+                    "No stdout handle associated with process",
+                )));
+            };
+            drop(lock);
+            let (tx, rx) = channel(100);
+            let sname = self.conf.name.clone();
+            spawn(async move {
+                let mut br = BufReader::new(stdout).lines();
+                while let Ok(Some(line)) = br.next_line().await {
+                    println!("{} :: {}", &sname, &line);
+                    if let Err(_) = tx.send(line).await {
+                        return;
+                    };
+                }
+            });
+            self.stdout = Some(Arc::new(Mutex::new(rx)));
+            Ok(())
         } else {
             Err(Box::new(std::io::Error::new(
                 std::io::ErrorKind::NotFound,
-                "is the process started?",
+                "No Process Associated with Service",
             )))
         }
     }
-    //TODO: this function needs to fork and do message passing via mpsc channels.
-    //TODO: this function needs to use a bufreader instead
-    pub fn stderr(&mut self) -> Result<String, Box<dyn std::error::Error>> {
-        if let Some(process) = &mut self.process {
-            if let Ok(mut process) = process.lock() {
-                let mut s = String::new();
-                if let Some(stderr) = process.stderr.as_mut() {
-                    stderr.read_to_string(&mut s)?;
+    /**
+     *  Takes control of service process' stderr file handle and spawns a new task to continuously
+     *  scan it.
+     */
+    pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+        if let Some(proc) = self.proc.clone() {
+            let mut lock = proc.lock().await;
+            let stderr = if let Some(stderr) = lock.stderr.take() {
+                stderr
+            } else {
+                return Err(Box::new(std::io::Error::new(
+                    std::io::ErrorKind::NotFound,
+                    "No stderr handle associated with process",
+                )));
+            };
+            drop(lock);
+            let (tx, rx) = channel(100);
+            let sname = self.conf.name.clone();
+            spawn(async move {
+                let mut br = BufReader::new(stderr).lines();
+                while let Ok(Some(line)) = br.next_line().await {
+                    println!("ERR :: {} :: {}", &sname, &line);
+                    if let Err(_) = tx.send(line).await {
+                        return;
+                    };
                 }
-                Ok(s)
+            });
+            self.stderr = Some(Arc::new(Mutex::new(rx)));
+            Ok(())
+        } else {
+            Err(Box::new(std::io::Error::new(
+                std::io::ErrorKind::NotFound,
+                "No Process Associated with Service",
+            )))
+        }
+    }
+    /**
+     *  Writes to the service process' stdin, if it exists.
+     */
+    pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
+        if let Some(proc) = self.proc.clone() {
+            let mut lock = proc.lock().await;
+            let stdin = if let Some(stdin) = lock.stdin.as_mut() {
+                stdin
             } else {
-                Err(Box::new(std::io::Error::new(
-                    std::io::ErrorKind::ResourceBusy,
-                    "cannot acquire lock",
-                )))
-            }
+                return Err(Box::new(std::io::Error::new(
+                    std::io::ErrorKind::NotFound,
+                    "No stdin handle associated with process",
+                )));
+            };
+            stdin.write(&buf.as_bytes()).await?;
+            Ok(())
         } else {
             Err(Box::new(std::io::Error::new(
                 std::io::ErrorKind::NotFound,
-                "is the process started?",
+                "No Process Associated with Service",
             )))
         }
     }
 }
-