1 |
use serde::{Deserialize, Serialize}; |
2 |
use tokio::{ |
3 |
io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, |
4 |
process::{Child, Command}, |
5 |
sync::{ |
6 |
Mutex, |
7 |
mpsc::{Receiver, channel}, |
8 |
}, |
9 |
task::spawn, |
10 |
}; |
11 |
|
12 |
use std::{path::PathBuf, process::Stdio, sync::Arc}; |
13 |
|
14 |
#[derive(Serialize, Deserialize, Clone, Debug)] |
15 |
pub struct ServiceConf { |
16 |
pub name: String, |
17 |
pub command: String, |
18 |
pub args: Option<String>, |
19 |
pub directory: Option<PathBuf>, |
20 |
pub autostart: bool, |
21 |
} |
22 |
impl Default for ServiceConf { |
23 |
fn default() -> Self { |
24 |
Self::new() |
25 |
} |
26 |
} |
27 |
impl ServiceConf { |
28 |
/** |
29 |
* Returns a new empty `ServiceConf` |
30 |
*/ |
31 |
pub fn new() -> Self { |
32 |
Self { |
33 |
name: String::new(), |
34 |
command: String::new(), |
35 |
args: None, |
36 |
directory: None, |
37 |
autostart: false, |
38 |
} |
39 |
} |
40 |
/** |
41 |
* Returns a new `ServiceConf` from parts. |
42 |
*/ |
43 |
pub fn from_parts( |
44 |
name: String, |
45 |
command: String, |
46 |
args: Option<String>, |
47 |
directory: Option<PathBuf>, |
48 |
autostart: bool, |
49 |
) -> Self { |
50 |
Self { |
51 |
name, |
52 |
command, |
53 |
args, |
54 |
directory, |
55 |
autostart, |
56 |
} |
57 |
} |
58 |
} |
59 |
|
60 |
#[derive(Debug)] |
61 |
pub struct Service { |
62 |
conf: ServiceConf, |
63 |
proc: Option<Arc<Mutex<Child>>>, |
64 |
stdout: Option<Arc<Mutex<Receiver<String>>>>, |
65 |
stderr: Option<Arc<Mutex<Receiver<String>>>>, |
66 |
} |
67 |
impl Default for Service { |
68 |
fn default() -> Self { |
69 |
Self::new() |
70 |
} |
71 |
} |
72 |
impl Service { |
73 |
/** |
74 |
* Returns a new empty `Service` |
75 |
*/ |
76 |
pub fn new() -> Self { |
77 |
Self { |
78 |
conf: ServiceConf::default(), |
79 |
proc: None, |
80 |
stdout: None, |
81 |
stderr: None, |
82 |
} |
83 |
} |
84 |
/** |
85 |
* Returns a `Service` made from a `ServiceConf`. |
86 |
*/ |
87 |
pub fn from_conf(conf: &ServiceConf) -> Self { |
88 |
Self { |
89 |
conf: conf.clone(), |
90 |
proc: None, |
91 |
stdout: None, |
92 |
stderr: None, |
93 |
} |
94 |
} |
95 |
/** |
96 |
* Returns the name of the service |
97 |
*/ |
98 |
pub async fn name(&self) -> &str { |
99 |
&self.conf.name |
100 |
} |
101 |
/** |
102 |
* Uses `tokio::process::Command` to start the service. |
103 |
*/ |
104 |
pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> { |
105 |
if self.proc.is_some() { |
106 |
return Err(Box::new(std::io::Error::new( |
107 |
std::io::ErrorKind::AlreadyExists, |
108 |
"Process Already Exists", |
109 |
))); |
110 |
} |
111 |
let cmd = &self.conf.command; |
112 |
let args = if let Some(a) = &self.conf.args { |
113 |
a.split_whitespace() |
114 |
} else { |
115 |
"".split_whitespace() |
116 |
}; |
117 |
let cwd = if let Some(c) = &self.conf.directory { |
118 |
c |
119 |
} else { |
120 |
&PathBuf::from("/") |
121 |
}; |
122 |
let child = Command::new(cmd) |
123 |
.args(args) |
124 |
.current_dir(cwd) |
125 |
.stdin(Stdio::piped()) |
126 |
.stdout(Stdio::piped()) |
127 |
.stderr(Stdio::piped()) |
128 |
.spawn()?; |
129 |
self.proc = Some(Arc::new(Mutex::new(child))); |
130 |
Ok(()) |
131 |
} |
132 |
/** |
133 |
* Returns true when process is started and false when process is stopped. |
134 |
*/ |
135 |
pub async fn started(&self) -> bool { |
136 |
self.proc.is_some() |
137 |
} |
138 |
/** |
139 |
* Invokes kill on the service process |
140 |
*/ |
141 |
pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> { |
142 |
if let Some(proc) = self.proc.clone() { |
143 |
let mut lock = proc.lock().await; |
144 |
lock.kill().await?; |
145 |
drop(lock); |
146 |
self.proc = None; |
147 |
Ok(()) |
148 |
} else { |
149 |
Err(Box::new(std::io::Error::new( |
150 |
std::io::ErrorKind::NotFound, |
151 |
"No Process Associated with Service", |
152 |
))) |
153 |
} |
154 |
} |
155 |
/** |
156 |
* Restarts service process |
157 |
*/ |
158 |
pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> { |
159 |
self.stop().await?; |
160 |
self.start().await?; |
161 |
Ok(()) |
162 |
} |
163 |
/** |
164 |
* Takes control of service process' stdout file handle and spawns a new task to continuously |
165 |
* scan it. |
166 |
*/ |
167 |
pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> { |
168 |
if let Some(proc) = self.proc.clone() { |
169 |
let mut lock = proc.lock().await; |
170 |
let stdout = if let Some(stdout) = lock.stdout.take() { |
171 |
stdout |
172 |
} else { |
173 |
return Err(Box::new(std::io::Error::new( |
174 |
std::io::ErrorKind::NotFound, |
175 |
"No stdout handle associated with process", |
176 |
))); |
177 |
}; |
178 |
drop(lock); |
179 |
let (tx, rx) = channel(100); |
180 |
let sname = self.conf.name.clone(); |
181 |
spawn(async move { |
182 |
let mut br = BufReader::new(stdout).lines(); |
183 |
while let Ok(Some(line)) = br.next_line().await { |
184 |
println!("{} :: {}", &sname, &line); |
185 |
if let Err(_) = tx.send(line).await { |
186 |
return; |
187 |
}; |
188 |
} |
189 |
}); |
190 |
self.stdout = Some(Arc::new(Mutex::new(rx))); |
191 |
Ok(()) |
192 |
} else { |
193 |
Err(Box::new(std::io::Error::new( |
194 |
std::io::ErrorKind::NotFound, |
195 |
"No Process Associated with Service", |
196 |
))) |
197 |
} |
198 |
} |
199 |
/** |
200 |
* Takes control of service process' stderr file handle and spawns a new task to continuously |
201 |
* scan it. |
202 |
*/ |
203 |
pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> { |
204 |
if let Some(proc) = self.proc.clone() { |
205 |
let mut lock = proc.lock().await; |
206 |
let stderr = if let Some(stderr) = lock.stderr.take() { |
207 |
stderr |
208 |
} else { |
209 |
return Err(Box::new(std::io::Error::new( |
210 |
std::io::ErrorKind::NotFound, |
211 |
"No stderr handle associated with process", |
212 |
))); |
213 |
}; |
214 |
drop(lock); |
215 |
let (tx, rx) = channel(100); |
216 |
let sname = self.conf.name.clone(); |
217 |
spawn(async move { |
218 |
let mut br = BufReader::new(stderr).lines(); |
219 |
while let Ok(Some(line)) = br.next_line().await { |
220 |
println!("ERR :: {} :: {}", &sname, &line); |
221 |
if let Err(_) = tx.send(line).await { |
222 |
return; |
223 |
}; |
224 |
} |
225 |
}); |
226 |
self.stderr = Some(Arc::new(Mutex::new(rx))); |
227 |
Ok(()) |
228 |
} else { |
229 |
Err(Box::new(std::io::Error::new( |
230 |
std::io::ErrorKind::NotFound, |
231 |
"No Process Associated with Service", |
232 |
))) |
233 |
} |
234 |
} |
235 |
/** |
236 |
* Writes to the service process' stdin, if it exists. |
237 |
*/ |
238 |
pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> { |
239 |
if let Some(proc) = self.proc.clone() { |
240 |
let mut lock = proc.lock().await; |
241 |
let stdin = if let Some(stdin) = lock.stdin.as_mut() { |
242 |
stdin |
243 |
} else { |
244 |
return Err(Box::new(std::io::Error::new( |
245 |
std::io::ErrorKind::NotFound, |
246 |
"No stdin handle associated with process", |
247 |
))); |
248 |
}; |
249 |
stdin.write(&buf.as_bytes()).await?; |
250 |
Ok(()) |
251 |
} else { |
252 |
Err(Box::new(std::io::Error::new( |
253 |
std::io::ErrorKind::NotFound, |
254 |
"No Process Associated with Service", |
255 |
))) |
256 |
} |
257 |
} |
258 |
} |