diff options
author | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-09 20:24:36 +0000 |
---|---|---|
committer | yuzu <yuzu@b9215c17-b818-4693-b096-d1e41a411fef> | 2025-07-09 20:24:36 +0000 |
commit | bc0fac1bf2a02c9a76c8c4b9eccc9235de4e86b2 (patch) | |
tree | aa8bd79bacddd6830453e8a8beb5ea976155c5fe /src | |
parent | b2cf95ed207e42d1f57de1e5b5030af8c734103b (diff) | |
download | salaryman-bc0fac1bf2a02c9a76c8c4b9eccc9235de4e86b2.tar.gz salaryman-bc0fac1bf2a02c9a76c8c4b9eccc9235de4e86b2.tar.bz2 salaryman-bc0fac1bf2a02c9a76c8c4b9eccc9235de4e86b2.zip |
working network communication
git-svn-id: svn+ssh://diminuette.aengel.lesbianunix.dev/salaryman/trunk@13 b9215c17-b818-4693-b096-d1e41a411fef
Diffstat (limited to 'src')
-rw-r--r-- | src/service.rs | 40 | ||||
-rw-r--r-- | src/smd/context.rs | 27 | ||||
-rw-r--r-- | src/smd/endpoints.rs | 36 | ||||
-rw-r--r-- | src/smd/main.rs | 71 |
4 files changed, 147 insertions, 27 deletions
diff --git a/src/service.rs b/src/service.rs index b42daba..dee6e59 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,3 +1,4 @@ +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, @@ -8,11 +9,13 @@ use tokio::{ }, task::spawn, }; +use uuid::Uuid; use std::{path::PathBuf, process::Stdio, sync::Arc}; -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)] pub struct ServiceConf { + pub uuid: Uuid, pub name: String, pub command: String, pub args: Option<String>, @@ -30,6 +33,7 @@ impl ServiceConf { */ pub fn new() -> Self { Self { + uuid: Uuid::new_v4(), name: String::new(), command: String::new(), args: None, @@ -41,6 +45,7 @@ impl ServiceConf { * Returns a new `ServiceConf` from parts. */ pub fn from_parts( + uuid: Uuid, name: String, command: String, args: Option<String>, @@ -48,6 +53,26 @@ impl ServiceConf { autostart: bool, ) -> Self { Self { + uuid, + name, + command, + args, + directory, + autostart, + } + } + /** + * Returns a new `ServiceConf` from parts with new uuid. + */ + pub fn new_from_parts( + name: String, + command: String, + args: Option<String>, + directory: Option<PathBuf>, + autostart: bool, + ) -> Self { + Self { + uuid: Uuid::new_v4(), name, command, args, @@ -174,10 +199,11 @@ impl Service { drop(lock); let (tx, rx) = channel(1024); let sname = self.conf.name.clone(); + let suuid = self.conf.uuid.clone(); spawn(async move { let mut br = BufReader::new(stdout).lines(); while let Ok(Some(line)) = br.next_line().await { - println!("{} :: {}", &sname, &line); + println!("{} ({}) :: {}", &suuid, &sname, &line); if let Err(_) = tx.send(line).await { return; }; @@ -210,10 +236,11 @@ impl Service { drop(lock); let (tx, rx) = channel(1024); let sname = self.conf.name.clone(); + let suuid = self.conf.uuid.clone(); spawn(async move { let mut br = BufReader::new(stderr).lines(); while let Ok(Some(line)) = br.next_line().await { - eprintln!("{} :: {}", &sname, &line); + eprintln!("{} ({}) >< {}", &suuid, &sname, &line); if let Err(_) = tx.send(line).await { return; }; @@ -252,4 +279,11 @@ impl Service { ))) } } + /** + * Writes a line to the service process' stdin, if it exists. + */ + #[inline] + pub async fn writeln_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> { + self.write_stdin(format!("{}\n", buf)).await + } } diff --git a/src/smd/context.rs b/src/smd/context.rs new file mode 100644 index 0000000..d8194c5 --- /dev/null +++ b/src/smd/context.rs @@ -0,0 +1,27 @@ +use super::Config; +use salaryman::service::Service; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::Mutex; + +pub struct SalarymanDContext { + pub config: Config, + pub service: Vec<Arc<Mutex<Service>>>, +} +impl SalarymanDContext { + pub fn new() -> Self { + Self { + config: Config::new(), + service: Vec::new(), + } + } + pub fn from_parts(config: Config, service: Vec<Arc<Mutex<Service>>>) -> Self { + Self { config, service } + } +} + +#[derive(Serialize, Deserialize, JsonSchema, Debug)] +pub struct StdinBuffer { + pub string: String, +} diff --git a/src/smd/endpoints.rs b/src/smd/endpoints.rs index 8b13789..48c9720 100644 --- a/src/smd/endpoints.rs +++ b/src/smd/endpoints.rs @@ -1 +1,37 @@ +use super::{ + Config, + context::{SalarymanDContext, StdinBuffer}, +}; +use dropshot::{HttpError, HttpResponseOk, RequestContext, TypedBody, endpoint}; +use std::sync::Arc; +#[endpoint { + method = GET, + path = "/config", +}] +pub async fn endpoint_get_config( + rqctx: RequestContext<Arc<SalarymanDContext>>, +) -> Result<HttpResponseOk<Config>, HttpError> { + Ok(HttpResponseOk(rqctx.context().config.clone())) +} + +#[endpoint { + method = PUT, + path = "/services/write" +}] +pub async fn endpoint_post_stdin( + rqctx: RequestContext<Arc<SalarymanDContext>>, + update: TypedBody<StdinBuffer>, +) -> Result<HttpResponseOk<()>, HttpError> { + let ctx = rqctx.context(); + let stdin_str = update.into_inner(); + for i in 0..ctx.service.len() { + let mut lock = ctx.service[i].lock().await; + if lock.started().await { + lock.writeln_stdin(stdin_str.string.clone()).await.unwrap(); //TODO: PROPERLY HANDLE ERROR! + } + drop(lock); + } + + Ok(HttpResponseOk(())) +} diff --git a/src/smd/main.rs b/src/smd/main.rs index fee2d9e..628785a 100644 --- a/src/smd/main.rs +++ b/src/smd/main.rs @@ -1,11 +1,17 @@ +mod context; mod endpoints; use clap::Parser; +use dropshot::{ApiDescription, ConfigLogging, ConfigLoggingLevel, ServerBuilder}; use salaryman::service::{Service, ServiceConf}; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use tokio::fs::read_to_string; +use tokio::{fs::read_to_string, sync::Mutex}; -use std::{net::IpAddr, path::PathBuf}; +use std::{net::IpAddr, path::PathBuf, sync::Arc}; + +use crate::context::SalarymanDContext; +use crate::endpoints::{endpoint_get_config, endpoint_post_stdin}; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -36,11 +42,28 @@ struct Args { port: u16, } -#[derive(Serialize, Deserialize, Debug)] -struct Config { - address: Option<IpAddr>, - port: Option<u16>, - service: Vec<ServiceConf>, +#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)] +pub struct User { + pub username: String, + pub token: String, +} + +#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)] +pub struct Config { + pub address: Option<IpAddr>, + pub port: Option<u16>, + pub user: Vec<User>, + pub service: Vec<ServiceConf>, +} +impl Config { + pub fn new() -> Self { + Self { + address: None, + port: None, + user: Vec::new(), + service: Vec::new(), + } + } } async fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error>> { @@ -66,26 +89,26 @@ async fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error async fn main() -> Result<(), Box<dyn std::error::Error>> { let args = Args::parse(); let conf: Config = load_config(&args.config).await?; - let mut services: Vec<Service> = Vec::new(); + let mut services: Vec<Arc<Mutex<Service>>> = Vec::new(); for i in 0..conf.service.len() { - services.push(Service::from_conf(&conf.service[i])); + services.push(Arc::new(Mutex::new(Service::from_conf(&conf.service[i])))); if conf.service[i].autostart { - services[i].start().await?; - services[i].scan_stdout().await?; - services[i].scan_stderr().await?; - } - } - tokio::time::sleep(std::time::Duration::from_secs(60)).await; - println!("trying to write to stdin!"); - for i in 0..services.len() { - services[i].write_stdin("stop\n".into()).await?; - } - tokio::time::sleep(std::time::Duration::from_secs(30)).await; - for mut service in services { - match service.stop().await { - Ok(_) => println!("lol it was killed"), - Err(_) => println!("it either didn't exist, or failed to kill"), + let mut lock = services[i].lock().await; + lock.start().await?; + lock.scan_stdout().await?; + lock.scan_stderr().await?; + drop(lock); } } + let log_conf = ConfigLogging::StderrTerminal { + level: ConfigLoggingLevel::Info, + }; + let log = log_conf.to_logger("smd")?; + let ctx = Arc::new(SalarymanDContext::from_parts(conf, services)); + let mut api = ApiDescription::new(); + api.register(endpoint_get_config).unwrap(); + api.register(endpoint_post_stdin).unwrap(); + let server = ServerBuilder::new(api, ctx.clone(), log).start()?; + server.await?; Ok(()) } |