ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/salaryman/trunk/src/server/main.rs
Revision: 17
Committed: Fri Aug 1 08:48:17 2025 UTC (2 months, 1 week ago) by yuzu
File size: 11659 byte(s)
Log Message:
unix socket get

File Contents

# User Rev Content
1 yuzu 9 use clap::Parser;
2 yuzu 17 use rayon::prelude::*;
3     use salaryman::{Service, ServiceConf, ServiceState, SalarymanPacket};
4 yuzu 9 use serde::{Deserialize, Serialize};
5 yuzu 14 use std::{
6 yuzu 17 io::{Read, Write},
7     fs::{File, read_to_string},
8     os::unix::net::UnixListener,
9 yuzu 14 path::PathBuf,
10 yuzu 17 sync::mpsc::{TryRecvError, channel},
11 yuzu 14 };
12 yuzu 17 use uuid::Uuid;
13 yuzu 9
14     #[derive(Parser, Debug)]
15     #[command(version, about, long_about = None)]
16     struct Args {
17     #[arg(
18     short,
19     long,
20     value_name = "FILE",
21     help = "config file override",
22 yuzu 17 default_value = "None",
23 yuzu 9 )]
24 yuzu 17 config: Option<PathBuf>,
25 yuzu 9 #[arg(
26     short,
27     long,
28 yuzu 16 value_name = "SOCK",
29     help = "UNIX socket to bind",
30     default_value = "/tmp/salaryman.sock"
31 yuzu 9 )]
32 yuzu 16 socket: PathBuf,
33 yuzu 9 }
34    
35 yuzu 17 pub enum InnerProtocol {
36 yuzu 16 Create(ServiceConf),
37 yuzu 17 Delete(Uuid),
38     Start(Uuid),
39     Stop(Uuid),
40     Write((Uuid, String)),
41     Scan,
42     Quit,
43 yuzu 9 }
44    
45 yuzu 16 #[derive(Serialize, Deserialize, Clone, Debug)]
46 yuzu 13 pub struct Config {
47 yuzu 16 pub socket: Option<PathBuf>,
48 yuzu 13 pub service: Vec<ServiceConf>,
49     }
50     impl Config {
51     pub fn new() -> Self {
52     Self {
53 yuzu 16 socket: None,
54 yuzu 13 service: Vec::new(),
55     }
56     }
57     }
58    
59 yuzu 16 fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error>> {
60     let s: String = match read_to_string(file) {
61 yuzu 9 Ok(s) => s,
62 yuzu 12 Err(_) => {
63     return Err(Box::new(std::io::Error::new(
64     std::io::ErrorKind::NotFound,
65     "cannot find config file",
66     )));
67     }
68 yuzu 9 };
69     match toml::from_str(s.as_str()) {
70 yuzu 11 Ok(c) => Ok(c),
71 yuzu 12 Err(_) => Err(Box::new(std::io::Error::new(
72     std::io::ErrorKind::Other,
73     "unable to parse config file",
74     ))),
75 yuzu 9 }
76     }
77    
78 yuzu 17 fn _save_config(file: &PathBuf, conf: &Config) -> Result<(), Box<dyn std::error::Error>> {
79     let mut f = File::options().create(true).truncate(true).write(true).open(file)?;
80     f.write(toml::to_string(conf)?.as_bytes())?;
81     Ok(())
82     }
83    
84     #[cfg(not(target_os = "windows"))] // We cannot build on Windows
85 yuzu 16 fn main() -> Result<(), Box<dyn std::error::Error>> {
86 yuzu 9 let args = Args::parse();
87 yuzu 17
88     // Config Handling
89     let local_conf = PathBuf::from("salaryman.conf");
90     let xdg_conf = PathBuf::from({
91     if let Ok(xcd) = std::env::var("XDG_CONFIG_HOME") {
92     xcd + "/smd/salaryman.toml"
93     } else {
94     if let Ok(homedir) = std::env::var("HOME") {
95     homedir + "/.config/smd/salaryman.conf"
96     } else {
97     "/root/.config/smd/salaryman.conf".into()
98     }
99     }
100     });
101     let sys_conf = PathBuf::from("/etc/salaryman/smd/salaryman.conf");
102     let confpaths = if let Some(path) = args.config.clone() {
103     vec![path]
104     } else {
105     vec![local_conf, xdg_conf, sys_conf]
106     };
107     let get_conf = |confpaths, p: Option<PathBuf>| -> (PathBuf, Config) {
108     for cfile in confpaths {
109     if let Ok(conf) = load_config(&cfile) {
110     return (cfile.to_owned(), conf);
111     }
112     }
113     if let Some(cpath) = p {
114     (cpath.to_owned(), Config::new())
115     } else {
116     (PathBuf::from("salaryman.toml"), Config::new())
117     }
118     };
119     let (config_path, conf) = get_conf(confpaths, args.config.clone());
120    
121     let sockaddr = if let Some(sock) = conf.socket {
122 yuzu 16 sock
123 yuzu 14 } else {
124 yuzu 16 args.socket
125 yuzu 14 };
126 yuzu 16 let mut services: Vec<Service> = Vec::new();
127     for service in conf.service {
128     services.push(service.build()?);
129 yuzu 14 }
130 yuzu 17 // event loop sender
131     let (etx, erx) = channel();
132     // listener
133     let (ltx, lrx) = channel();
134     // main services event loop
135     let (stx, srx) = channel();
136    
137     // for event loop sender to main services event
138     let queue = stx.clone();
139     let lkill = ltx.clone();
140     std::thread::spawn(move || {
141     loop { // event loop sender
142     match erx.try_recv() {
143     Err(TryRecvError::Disconnected) | Ok(true) => {
144     queue.send(InnerProtocol::Quit).unwrap_or_default();
145     lkill.send(true).unwrap_or_default();
146     return;
147     },
148     _ => (),
149     }
150     queue.send(InnerProtocol::Scan).unwrap_or_default();
151     std::thread::sleep(std::time::Duration::from_millis(250));
152     }
153     });
154    
155     let ekill = etx.clone();
156     let lkill = ltx.clone();
157     std::thread::spawn(move || {
158     // Main Services Event Loop
159     loop {
160     match srx.try_recv() {
161     Err(TryRecvError::Disconnected) | Ok(InnerProtocol::Quit) => {
162     services.par_iter_mut()
163     .for_each(|service| {
164     if let Err(e) = service.stop() {
165     eprintln!("unable to start service {} with id {} due to error {e}", &service.name(), &service.uuid());
166     }
167     });
168     ekill.send(true).unwrap_or_default();
169     lkill.send(true).unwrap_or_default();
170     return;
171     },
172     Ok(InnerProtocol::Create(conf)) => if let Ok(s) = conf.build() { services.push(s); },
173     Ok(InnerProtocol::Start(u)) => {
174     services.par_iter_mut()
175     .filter(|s| s.uuid() == &u)
176     .for_each(|s| {
177     if !s.started() {
178     if let Ok(_) = s.start() {
179     return;
180     }
181     }
182     });
183     },
184     Ok(InnerProtocol::Stop(u)) => {
185     services.par_iter_mut()
186     .filter(|s| s.uuid() == &u)
187     .for_each(|s| {
188     if s.started() {
189     if let Ok(_) = s.stop() {
190     return;
191     }
192     }
193     });
194     },
195     Ok(InnerProtocol::Write((u, buf))) => {
196     services.par_iter_mut()
197     .filter(|s| s.uuid() == &u)
198     .for_each(|s| {
199     if let Ok(_) = s.write_stdin(&buf) {
200     return;
201     }
202     });
203     },
204     Ok(InnerProtocol::Delete(u)) => {
205     if let Some(i) = services.par_iter_mut()
206     .position_first(|s| s.uuid() == &u) {
207     if let Err(e) = services[i].stop() {
208     eprintln!("unable to stop service {} with id {} due to error {e}", &services[i].name(), &services[i].uuid());
209     };
210     services.swap_remove(i);
211     }
212     },
213     Ok(InnerProtocol::Scan) => {
214     services.par_iter_mut()
215     .for_each(|service| {
216     match service.state() {
217     ServiceState::Failed => {
218     if let Err(e) = service.restart() {
219     eprintln!("unable to restart service {} with id {} due to error {e}", &service.name(), &service.uuid());
220     }
221     },
222     _ => (),
223     }
224     });
225     },
226     _ => std::thread::sleep(std::time::Duration::from_millis(100)),
227     }
228     }
229     });
230     let listen = UnixListener::bind(sockaddr)?;
231     let queue = stx.clone();
232     let ekill = etx.clone();
233     for stream in listen.incoming() {
234     match stream {
235     Ok(mut stream) => {
236     match lrx.try_recv() {
237     Err(TryRecvError::Disconnected) | Ok(true) => {
238     ekill.send(true).unwrap_or_default();
239     queue.send(InnerProtocol::Quit).unwrap_or_default();
240     return Ok(());
241     },
242 yuzu 16 _ => (),
243     }
244 yuzu 17 let squeue = queue.clone();
245     std::thread::spawn(move || {
246     let mut buf = String::new();
247     if let Ok(len) = stream.read_to_string(&mut buf) {
248     if len > 0 {
249     let resp = match SalarymanPacket::deserialize(&buf.as_bytes()) {
250     Ok(SalarymanPacket::Create(sc)) => {
251     squeue.send(InnerProtocol::Create(sc.to_owned())).unwrap_or_default();
252     SalarymanPacket::response(&SalarymanPacket::Create(sc))
253     },
254     Ok(SalarymanPacket::Delete(u)) => {
255     squeue.send(InnerProtocol::Delete(u.to_owned())).unwrap_or_default();
256     SalarymanPacket::response(&SalarymanPacket::Delete(u))
257     },
258     Ok(SalarymanPacket::Start(u)) => {
259     squeue.send(InnerProtocol::Start(u.to_owned())).unwrap_or_default();
260     SalarymanPacket::response(&SalarymanPacket::Start(u))
261     },
262     Ok(SalarymanPacket::Stop(u)) => {
263     squeue.send(InnerProtocol::Stop(u.to_owned())).unwrap_or_default();
264     SalarymanPacket::response(&SalarymanPacket::Stop(u))
265     },
266     Ok(SalarymanPacket::Restart(u)) => {
267     squeue.send(InnerProtocol::Stop(u.to_owned())).unwrap_or_default();
268     squeue.send(InnerProtocol::Start(u.to_owned())).unwrap_or_default();
269     SalarymanPacket::response(&SalarymanPacket::Restart(u))
270     },
271     Ok(SalarymanPacket::Quit) => {
272     squeue.send(InnerProtocol::Quit).unwrap_or_default();
273     SalarymanPacket::response(&SalarymanPacket::Quit)
274     },
275     Ok(b) => SalarymanPacket::response(&b),
276     Err(_) => SalarymanPacket::Invalid,
277     };
278     let resp = if let Ok(i) = SalarymanPacket::serialize(&resp) {
279     i
280     } else {
281     Vec::new()
282     };
283     if let Ok(len) = stream.write(&resp) {
284     if len > 0 {
285     stream.flush().unwrap_or_default();
286     }
287     }
288     }
289     }
290     });
291     },
292     Err(_) => {
293     match lrx.try_recv() {
294     Err(TryRecvError::Disconnected) | Ok(true) => {
295     ekill.send(true).unwrap_or_default();
296     queue.send(InnerProtocol::Quit).unwrap_or_default();
297     return Ok(());
298     },
299     _ => (),
300     }
301     },
302     }
303 yuzu 9 }
304 yuzu 17 Ok(())
305 yuzu 9 }
306 yuzu 17