ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/salaryman/trunk/src/server/main.rs
Revision: 17
Committed: Fri Aug 1 08:48:17 2025 UTC (2 months, 1 week ago) by yuzu
File size: 11659 byte(s)
Log Message:
unix socket get

File Contents

# Content
1 use clap::Parser;
2 use rayon::prelude::*;
3 use salaryman::{Service, ServiceConf, ServiceState, SalarymanPacket};
4 use serde::{Deserialize, Serialize};
5 use std::{
6 io::{Read, Write},
7 fs::{File, read_to_string},
8 os::unix::net::UnixListener,
9 path::PathBuf,
10 sync::mpsc::{TryRecvError, channel},
11 };
12 use uuid::Uuid;
13
14 #[derive(Parser, Debug)]
15 #[command(version, about, long_about = None)]
16 struct Args {
17 #[arg(
18 short,
19 long,
20 value_name = "FILE",
21 help = "config file override",
22 default_value = "None",
23 )]
24 config: Option<PathBuf>,
25 #[arg(
26 short,
27 long,
28 value_name = "SOCK",
29 help = "UNIX socket to bind",
30 default_value = "/tmp/salaryman.sock"
31 )]
32 socket: PathBuf,
33 }
34
35 pub enum InnerProtocol {
36 Create(ServiceConf),
37 Delete(Uuid),
38 Start(Uuid),
39 Stop(Uuid),
40 Write((Uuid, String)),
41 Scan,
42 Quit,
43 }
44
45 #[derive(Serialize, Deserialize, Clone, Debug)]
46 pub struct Config {
47 pub socket: Option<PathBuf>,
48 pub service: Vec<ServiceConf>,
49 }
50 impl Config {
51 pub fn new() -> Self {
52 Self {
53 socket: None,
54 service: Vec::new(),
55 }
56 }
57 }
58
59 fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error>> {
60 let s: String = match read_to_string(file) {
61 Ok(s) => s,
62 Err(_) => {
63 return Err(Box::new(std::io::Error::new(
64 std::io::ErrorKind::NotFound,
65 "cannot find config file",
66 )));
67 }
68 };
69 match toml::from_str(s.as_str()) {
70 Ok(c) => Ok(c),
71 Err(_) => Err(Box::new(std::io::Error::new(
72 std::io::ErrorKind::Other,
73 "unable to parse config file",
74 ))),
75 }
76 }
77
78 fn _save_config(file: &PathBuf, conf: &Config) -> Result<(), Box<dyn std::error::Error>> {
79 let mut f = File::options().create(true).truncate(true).write(true).open(file)?;
80 f.write(toml::to_string(conf)?.as_bytes())?;
81 Ok(())
82 }
83
84 #[cfg(not(target_os = "windows"))] // We cannot build on Windows
85 fn main() -> Result<(), Box<dyn std::error::Error>> {
86 let args = Args::parse();
87
88 // Config Handling
89 let local_conf = PathBuf::from("salaryman.conf");
90 let xdg_conf = PathBuf::from({
91 if let Ok(xcd) = std::env::var("XDG_CONFIG_HOME") {
92 xcd + "/smd/salaryman.toml"
93 } else {
94 if let Ok(homedir) = std::env::var("HOME") {
95 homedir + "/.config/smd/salaryman.conf"
96 } else {
97 "/root/.config/smd/salaryman.conf".into()
98 }
99 }
100 });
101 let sys_conf = PathBuf::from("/etc/salaryman/smd/salaryman.conf");
102 let confpaths = if let Some(path) = args.config.clone() {
103 vec![path]
104 } else {
105 vec![local_conf, xdg_conf, sys_conf]
106 };
107 let get_conf = |confpaths, p: Option<PathBuf>| -> (PathBuf, Config) {
108 for cfile in confpaths {
109 if let Ok(conf) = load_config(&cfile) {
110 return (cfile.to_owned(), conf);
111 }
112 }
113 if let Some(cpath) = p {
114 (cpath.to_owned(), Config::new())
115 } else {
116 (PathBuf::from("salaryman.toml"), Config::new())
117 }
118 };
119 let (config_path, conf) = get_conf(confpaths, args.config.clone());
120
121 let sockaddr = if let Some(sock) = conf.socket {
122 sock
123 } else {
124 args.socket
125 };
126 let mut services: Vec<Service> = Vec::new();
127 for service in conf.service {
128 services.push(service.build()?);
129 }
130 // event loop sender
131 let (etx, erx) = channel();
132 // listener
133 let (ltx, lrx) = channel();
134 // main services event loop
135 let (stx, srx) = channel();
136
137 // for event loop sender to main services event
138 let queue = stx.clone();
139 let lkill = ltx.clone();
140 std::thread::spawn(move || {
141 loop { // event loop sender
142 match erx.try_recv() {
143 Err(TryRecvError::Disconnected) | Ok(true) => {
144 queue.send(InnerProtocol::Quit).unwrap_or_default();
145 lkill.send(true).unwrap_or_default();
146 return;
147 },
148 _ => (),
149 }
150 queue.send(InnerProtocol::Scan).unwrap_or_default();
151 std::thread::sleep(std::time::Duration::from_millis(250));
152 }
153 });
154
155 let ekill = etx.clone();
156 let lkill = ltx.clone();
157 std::thread::spawn(move || {
158 // Main Services Event Loop
159 loop {
160 match srx.try_recv() {
161 Err(TryRecvError::Disconnected) | Ok(InnerProtocol::Quit) => {
162 services.par_iter_mut()
163 .for_each(|service| {
164 if let Err(e) = service.stop() {
165 eprintln!("unable to start service {} with id {} due to error {e}", &service.name(), &service.uuid());
166 }
167 });
168 ekill.send(true).unwrap_or_default();
169 lkill.send(true).unwrap_or_default();
170 return;
171 },
172 Ok(InnerProtocol::Create(conf)) => if let Ok(s) = conf.build() { services.push(s); },
173 Ok(InnerProtocol::Start(u)) => {
174 services.par_iter_mut()
175 .filter(|s| s.uuid() == &u)
176 .for_each(|s| {
177 if !s.started() {
178 if let Ok(_) = s.start() {
179 return;
180 }
181 }
182 });
183 },
184 Ok(InnerProtocol::Stop(u)) => {
185 services.par_iter_mut()
186 .filter(|s| s.uuid() == &u)
187 .for_each(|s| {
188 if s.started() {
189 if let Ok(_) = s.stop() {
190 return;
191 }
192 }
193 });
194 },
195 Ok(InnerProtocol::Write((u, buf))) => {
196 services.par_iter_mut()
197 .filter(|s| s.uuid() == &u)
198 .for_each(|s| {
199 if let Ok(_) = s.write_stdin(&buf) {
200 return;
201 }
202 });
203 },
204 Ok(InnerProtocol::Delete(u)) => {
205 if let Some(i) = services.par_iter_mut()
206 .position_first(|s| s.uuid() == &u) {
207 if let Err(e) = services[i].stop() {
208 eprintln!("unable to stop service {} with id {} due to error {e}", &services[i].name(), &services[i].uuid());
209 };
210 services.swap_remove(i);
211 }
212 },
213 Ok(InnerProtocol::Scan) => {
214 services.par_iter_mut()
215 .for_each(|service| {
216 match service.state() {
217 ServiceState::Failed => {
218 if let Err(e) = service.restart() {
219 eprintln!("unable to restart service {} with id {} due to error {e}", &service.name(), &service.uuid());
220 }
221 },
222 _ => (),
223 }
224 });
225 },
226 _ => std::thread::sleep(std::time::Duration::from_millis(100)),
227 }
228 }
229 });
230 let listen = UnixListener::bind(sockaddr)?;
231 let queue = stx.clone();
232 let ekill = etx.clone();
233 for stream in listen.incoming() {
234 match stream {
235 Ok(mut stream) => {
236 match lrx.try_recv() {
237 Err(TryRecvError::Disconnected) | Ok(true) => {
238 ekill.send(true).unwrap_or_default();
239 queue.send(InnerProtocol::Quit).unwrap_or_default();
240 return Ok(());
241 },
242 _ => (),
243 }
244 let squeue = queue.clone();
245 std::thread::spawn(move || {
246 let mut buf = String::new();
247 if let Ok(len) = stream.read_to_string(&mut buf) {
248 if len > 0 {
249 let resp = match SalarymanPacket::deserialize(&buf.as_bytes()) {
250 Ok(SalarymanPacket::Create(sc)) => {
251 squeue.send(InnerProtocol::Create(sc.to_owned())).unwrap_or_default();
252 SalarymanPacket::response(&SalarymanPacket::Create(sc))
253 },
254 Ok(SalarymanPacket::Delete(u)) => {
255 squeue.send(InnerProtocol::Delete(u.to_owned())).unwrap_or_default();
256 SalarymanPacket::response(&SalarymanPacket::Delete(u))
257 },
258 Ok(SalarymanPacket::Start(u)) => {
259 squeue.send(InnerProtocol::Start(u.to_owned())).unwrap_or_default();
260 SalarymanPacket::response(&SalarymanPacket::Start(u))
261 },
262 Ok(SalarymanPacket::Stop(u)) => {
263 squeue.send(InnerProtocol::Stop(u.to_owned())).unwrap_or_default();
264 SalarymanPacket::response(&SalarymanPacket::Stop(u))
265 },
266 Ok(SalarymanPacket::Restart(u)) => {
267 squeue.send(InnerProtocol::Stop(u.to_owned())).unwrap_or_default();
268 squeue.send(InnerProtocol::Start(u.to_owned())).unwrap_or_default();
269 SalarymanPacket::response(&SalarymanPacket::Restart(u))
270 },
271 Ok(SalarymanPacket::Quit) => {
272 squeue.send(InnerProtocol::Quit).unwrap_or_default();
273 SalarymanPacket::response(&SalarymanPacket::Quit)
274 },
275 Ok(b) => SalarymanPacket::response(&b),
276 Err(_) => SalarymanPacket::Invalid,
277 };
278 let resp = if let Ok(i) = SalarymanPacket::serialize(&resp) {
279 i
280 } else {
281 Vec::new()
282 };
283 if let Ok(len) = stream.write(&resp) {
284 if len > 0 {
285 stream.flush().unwrap_or_default();
286 }
287 }
288 }
289 }
290 });
291 },
292 Err(_) => {
293 match lrx.try_recv() {
294 Err(TryRecvError::Disconnected) | Ok(true) => {
295 ekill.send(true).unwrap_or_default();
296 queue.send(InnerProtocol::Quit).unwrap_or_default();
297 return Ok(());
298 },
299 _ => (),
300 }
301 },
302 }
303 }
304 Ok(())
305 }
306