1 |
use clap::Parser; |
2 |
use rayon::prelude::*; |
3 |
use salaryman::{Service, ServiceConf, ServiceState, SalarymanPacket}; |
4 |
use serde::{Deserialize, Serialize}; |
5 |
use std::{ |
6 |
io::{Read, Write}, |
7 |
fs::{File, read_to_string}, |
8 |
os::unix::net::UnixListener, |
9 |
path::PathBuf, |
10 |
sync::mpsc::{TryRecvError, channel}, |
11 |
}; |
12 |
use uuid::Uuid; |
13 |
|
14 |
#[derive(Parser, Debug)] |
15 |
#[command(version, about, long_about = None)] |
16 |
struct Args { |
17 |
#[arg( |
18 |
short, |
19 |
long, |
20 |
value_name = "FILE", |
21 |
help = "config file override", |
22 |
default_value = "None", |
23 |
)] |
24 |
config: Option<PathBuf>, |
25 |
#[arg( |
26 |
short, |
27 |
long, |
28 |
value_name = "SOCK", |
29 |
help = "UNIX socket to bind", |
30 |
default_value = "/tmp/salaryman.sock" |
31 |
)] |
32 |
socket: PathBuf, |
33 |
} |
34 |
|
35 |
pub enum InnerProtocol { |
36 |
Create(ServiceConf), |
37 |
Delete(Uuid), |
38 |
Start(Uuid), |
39 |
Stop(Uuid), |
40 |
Write((Uuid, String)), |
41 |
Scan, |
42 |
Quit, |
43 |
} |
44 |
|
45 |
#[derive(Serialize, Deserialize, Clone, Debug)] |
46 |
pub struct Config { |
47 |
pub socket: Option<PathBuf>, |
48 |
pub service: Vec<ServiceConf>, |
49 |
} |
50 |
impl Config { |
51 |
pub fn new() -> Self { |
52 |
Self { |
53 |
socket: None, |
54 |
service: Vec::new(), |
55 |
} |
56 |
} |
57 |
} |
58 |
|
59 |
fn load_config(file: &PathBuf) -> Result<Config, Box<dyn std::error::Error>> { |
60 |
let s: String = match read_to_string(file) { |
61 |
Ok(s) => s, |
62 |
Err(_) => { |
63 |
return Err(Box::new(std::io::Error::new( |
64 |
std::io::ErrorKind::NotFound, |
65 |
"cannot find config file", |
66 |
))); |
67 |
} |
68 |
}; |
69 |
match toml::from_str(s.as_str()) { |
70 |
Ok(c) => Ok(c), |
71 |
Err(_) => Err(Box::new(std::io::Error::new( |
72 |
std::io::ErrorKind::Other, |
73 |
"unable to parse config file", |
74 |
))), |
75 |
} |
76 |
} |
77 |
|
78 |
fn _save_config(file: &PathBuf, conf: &Config) -> Result<(), Box<dyn std::error::Error>> { |
79 |
let mut f = File::options().create(true).truncate(true).write(true).open(file)?; |
80 |
f.write(toml::to_string(conf)?.as_bytes())?; |
81 |
Ok(()) |
82 |
} |
83 |
|
84 |
#[cfg(not(target_os = "windows"))] // We cannot build on Windows |
85 |
fn main() -> Result<(), Box<dyn std::error::Error>> { |
86 |
let args = Args::parse(); |
87 |
|
88 |
// Config Handling |
89 |
let local_conf = PathBuf::from("salaryman.conf"); |
90 |
let xdg_conf = PathBuf::from({ |
91 |
if let Ok(xcd) = std::env::var("XDG_CONFIG_HOME") { |
92 |
xcd + "/smd/salaryman.toml" |
93 |
} else { |
94 |
if let Ok(homedir) = std::env::var("HOME") { |
95 |
homedir + "/.config/smd/salaryman.conf" |
96 |
} else { |
97 |
"/root/.config/smd/salaryman.conf".into() |
98 |
} |
99 |
} |
100 |
}); |
101 |
let sys_conf = PathBuf::from("/etc/salaryman/smd/salaryman.conf"); |
102 |
let confpaths = if let Some(path) = args.config.clone() { |
103 |
vec![path] |
104 |
} else { |
105 |
vec![local_conf, xdg_conf, sys_conf] |
106 |
}; |
107 |
let get_conf = |confpaths, p: Option<PathBuf>| -> (PathBuf, Config) { |
108 |
for cfile in confpaths { |
109 |
if let Ok(conf) = load_config(&cfile) { |
110 |
return (cfile.to_owned(), conf); |
111 |
} |
112 |
} |
113 |
if let Some(cpath) = p { |
114 |
(cpath.to_owned(), Config::new()) |
115 |
} else { |
116 |
(PathBuf::from("salaryman.toml"), Config::new()) |
117 |
} |
118 |
}; |
119 |
let (config_path, conf) = get_conf(confpaths, args.config.clone()); |
120 |
|
121 |
let sockaddr = if let Some(sock) = conf.socket { |
122 |
sock |
123 |
} else { |
124 |
args.socket |
125 |
}; |
126 |
let mut services: Vec<Service> = Vec::new(); |
127 |
for service in conf.service { |
128 |
services.push(service.build()?); |
129 |
} |
130 |
// event loop sender |
131 |
let (etx, erx) = channel(); |
132 |
// listener |
133 |
let (ltx, lrx) = channel(); |
134 |
// main services event loop |
135 |
let (stx, srx) = channel(); |
136 |
|
137 |
// for event loop sender to main services event |
138 |
let queue = stx.clone(); |
139 |
let lkill = ltx.clone(); |
140 |
std::thread::spawn(move || { |
141 |
loop { // event loop sender |
142 |
match erx.try_recv() { |
143 |
Err(TryRecvError::Disconnected) | Ok(true) => { |
144 |
queue.send(InnerProtocol::Quit).unwrap_or_default(); |
145 |
lkill.send(true).unwrap_or_default(); |
146 |
return; |
147 |
}, |
148 |
_ => (), |
149 |
} |
150 |
queue.send(InnerProtocol::Scan).unwrap_or_default(); |
151 |
std::thread::sleep(std::time::Duration::from_millis(250)); |
152 |
} |
153 |
}); |
154 |
|
155 |
let ekill = etx.clone(); |
156 |
let lkill = ltx.clone(); |
157 |
std::thread::spawn(move || { |
158 |
// Main Services Event Loop |
159 |
loop { |
160 |
match srx.try_recv() { |
161 |
Err(TryRecvError::Disconnected) | Ok(InnerProtocol::Quit) => { |
162 |
services.par_iter_mut() |
163 |
.for_each(|service| { |
164 |
if let Err(e) = service.stop() { |
165 |
eprintln!("unable to start service {} with id {} due to error {e}", &service.name(), &service.uuid()); |
166 |
} |
167 |
}); |
168 |
ekill.send(true).unwrap_or_default(); |
169 |
lkill.send(true).unwrap_or_default(); |
170 |
return; |
171 |
}, |
172 |
Ok(InnerProtocol::Create(conf)) => if let Ok(s) = conf.build() { services.push(s); }, |
173 |
Ok(InnerProtocol::Start(u)) => { |
174 |
services.par_iter_mut() |
175 |
.filter(|s| s.uuid() == &u) |
176 |
.for_each(|s| { |
177 |
if !s.started() { |
178 |
if let Ok(_) = s.start() { |
179 |
return; |
180 |
} |
181 |
} |
182 |
}); |
183 |
}, |
184 |
Ok(InnerProtocol::Stop(u)) => { |
185 |
services.par_iter_mut() |
186 |
.filter(|s| s.uuid() == &u) |
187 |
.for_each(|s| { |
188 |
if s.started() { |
189 |
if let Ok(_) = s.stop() { |
190 |
return; |
191 |
} |
192 |
} |
193 |
}); |
194 |
}, |
195 |
Ok(InnerProtocol::Write((u, buf))) => { |
196 |
services.par_iter_mut() |
197 |
.filter(|s| s.uuid() == &u) |
198 |
.for_each(|s| { |
199 |
if let Ok(_) = s.write_stdin(&buf) { |
200 |
return; |
201 |
} |
202 |
}); |
203 |
}, |
204 |
Ok(InnerProtocol::Delete(u)) => { |
205 |
if let Some(i) = services.par_iter_mut() |
206 |
.position_first(|s| s.uuid() == &u) { |
207 |
if let Err(e) = services[i].stop() { |
208 |
eprintln!("unable to stop service {} with id {} due to error {e}", &services[i].name(), &services[i].uuid()); |
209 |
}; |
210 |
services.swap_remove(i); |
211 |
} |
212 |
}, |
213 |
Ok(InnerProtocol::Scan) => { |
214 |
services.par_iter_mut() |
215 |
.for_each(|service| { |
216 |
match service.state() { |
217 |
ServiceState::Failed => { |
218 |
if let Err(e) = service.restart() { |
219 |
eprintln!("unable to restart service {} with id {} due to error {e}", &service.name(), &service.uuid()); |
220 |
} |
221 |
}, |
222 |
_ => (), |
223 |
} |
224 |
}); |
225 |
}, |
226 |
_ => std::thread::sleep(std::time::Duration::from_millis(100)), |
227 |
} |
228 |
} |
229 |
}); |
230 |
let listen = UnixListener::bind(sockaddr)?; |
231 |
let queue = stx.clone(); |
232 |
let ekill = etx.clone(); |
233 |
for stream in listen.incoming() { |
234 |
match stream { |
235 |
Ok(mut stream) => { |
236 |
match lrx.try_recv() { |
237 |
Err(TryRecvError::Disconnected) | Ok(true) => { |
238 |
ekill.send(true).unwrap_or_default(); |
239 |
queue.send(InnerProtocol::Quit).unwrap_or_default(); |
240 |
return Ok(()); |
241 |
}, |
242 |
_ => (), |
243 |
} |
244 |
let squeue = queue.clone(); |
245 |
std::thread::spawn(move || { |
246 |
let mut buf = String::new(); |
247 |
if let Ok(len) = stream.read_to_string(&mut buf) { |
248 |
if len > 0 { |
249 |
let resp = match SalarymanPacket::deserialize(&buf.as_bytes()) { |
250 |
Ok(SalarymanPacket::Create(sc)) => { |
251 |
squeue.send(InnerProtocol::Create(sc.to_owned())).unwrap_or_default(); |
252 |
SalarymanPacket::response(&SalarymanPacket::Create(sc)) |
253 |
}, |
254 |
Ok(SalarymanPacket::Delete(u)) => { |
255 |
squeue.send(InnerProtocol::Delete(u.to_owned())).unwrap_or_default(); |
256 |
SalarymanPacket::response(&SalarymanPacket::Delete(u)) |
257 |
}, |
258 |
Ok(SalarymanPacket::Start(u)) => { |
259 |
squeue.send(InnerProtocol::Start(u.to_owned())).unwrap_or_default(); |
260 |
SalarymanPacket::response(&SalarymanPacket::Start(u)) |
261 |
}, |
262 |
Ok(SalarymanPacket::Stop(u)) => { |
263 |
squeue.send(InnerProtocol::Stop(u.to_owned())).unwrap_or_default(); |
264 |
SalarymanPacket::response(&SalarymanPacket::Stop(u)) |
265 |
}, |
266 |
Ok(SalarymanPacket::Restart(u)) => { |
267 |
squeue.send(InnerProtocol::Stop(u.to_owned())).unwrap_or_default(); |
268 |
squeue.send(InnerProtocol::Start(u.to_owned())).unwrap_or_default(); |
269 |
SalarymanPacket::response(&SalarymanPacket::Restart(u)) |
270 |
}, |
271 |
Ok(SalarymanPacket::Quit) => { |
272 |
squeue.send(InnerProtocol::Quit).unwrap_or_default(); |
273 |
SalarymanPacket::response(&SalarymanPacket::Quit) |
274 |
}, |
275 |
Ok(b) => SalarymanPacket::response(&b), |
276 |
Err(_) => SalarymanPacket::Invalid, |
277 |
}; |
278 |
let resp = if let Ok(i) = SalarymanPacket::serialize(&resp) { |
279 |
i |
280 |
} else { |
281 |
Vec::new() |
282 |
}; |
283 |
if let Ok(len) = stream.write(&resp) { |
284 |
if len > 0 { |
285 |
stream.flush().unwrap_or_default(); |
286 |
} |
287 |
} |
288 |
} |
289 |
} |
290 |
}); |
291 |
}, |
292 |
Err(_) => { |
293 |
match lrx.try_recv() { |
294 |
Err(TryRecvError::Disconnected) | Ok(true) => { |
295 |
ekill.send(true).unwrap_or_default(); |
296 |
queue.send(InnerProtocol::Quit).unwrap_or_default(); |
297 |
return Ok(()); |
298 |
}, |
299 |
_ => (), |
300 |
} |
301 |
}, |
302 |
} |
303 |
} |
304 |
Ok(()) |
305 |
} |
306 |
|