ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/salaryman/trunk/src/service.rs
Revision: 14
Committed: Sat Jul 12 06:17:38 2025 UTC (2 months, 4 weeks ago) by yuzu
File size: 9192 byte(s)
Log Message:
add start, stop, restart endpoints

File Contents

# Content
1 use schemars::JsonSchema;
2 use serde::{Deserialize, Serialize};
3 use tokio::{
4 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
5 process::{Child, Command},
6 sync::{
7 Mutex,
8 mpsc::{Receiver, channel},
9 },
10 task::spawn,
11 };
12 use uuid::Uuid;
13
14 use std::{path::PathBuf, process::Stdio, sync::Arc};
15
16 #[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)]
17 pub struct ServiceConf {
18 pub uuid: Uuid,
19 pub name: String,
20 pub command: String,
21 pub args: Option<String>,
22 pub directory: Option<PathBuf>,
23 pub autostart: bool,
24 }
25 impl Default for ServiceConf {
26 fn default() -> Self {
27 Self::new()
28 }
29 }
30 impl ServiceConf {
31 /**
32 * Returns a new empty `ServiceConf`
33 */
34 pub fn new() -> Self {
35 Self {
36 uuid: Uuid::new_v4(),
37 name: String::new(),
38 command: String::new(),
39 args: None,
40 directory: None,
41 autostart: false,
42 }
43 }
44 /**
45 * Returns a new `ServiceConf` from parts.
46 */
47 pub fn from_parts(
48 uuid: Uuid,
49 name: String,
50 command: String,
51 args: Option<String>,
52 directory: Option<PathBuf>,
53 autostart: bool,
54 ) -> Self {
55 Self {
56 uuid,
57 name,
58 command,
59 args,
60 directory,
61 autostart,
62 }
63 }
64 /**
65 * Returns a new `ServiceConf` from parts with new uuid.
66 */
67 pub fn new_from_parts(
68 name: String,
69 command: String,
70 args: Option<String>,
71 directory: Option<PathBuf>,
72 autostart: bool,
73 ) -> Self {
74 Self {
75 uuid: Uuid::new_v4(),
76 name,
77 command,
78 args,
79 directory,
80 autostart,
81 }
82 }
83 }
84
85 #[derive(Debug)]
86 pub struct Service {
87 conf: ServiceConf,
88 proc: Option<Arc<Mutex<Child>>>,
89 pub stdout: Option<Arc<Mutex<Receiver<String>>>>,
90 pub stderr: Option<Arc<Mutex<Receiver<String>>>>,
91 }
92 impl Default for Service {
93 fn default() -> Self {
94 Self::new()
95 }
96 }
97 impl Service {
98 /**
99 * Returns a new empty `Service`
100 */
101 pub fn new() -> Self {
102 Self {
103 conf: ServiceConf::default(),
104 proc: None,
105 stdout: None,
106 stderr: None,
107 }
108 }
109 /**
110 * Returns a `Service` made from a `ServiceConf`.
111 */
112 pub fn from_conf(conf: &ServiceConf) -> Self {
113 Self {
114 conf: conf.clone(),
115 proc: None,
116 stdout: None,
117 stderr: None,
118 }
119 }
120 /**
121 * Returns the name of the service
122 */
123 pub async fn name(&self) -> &str {
124 &self.conf.name
125 }
126 /**
127 * Uses `tokio::process::Command` to start the service.
128 */
129 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
130 if self.proc.is_some() {
131 return Err(Box::new(std::io::Error::new(
132 std::io::ErrorKind::AlreadyExists,
133 "Process Already Exists",
134 )));
135 }
136 let cmd = &self.conf.command;
137 let mut proc = Command::new(cmd);
138 proc.stdin(Stdio::piped());
139 proc.stdout(Stdio::piped());
140 proc.stderr(Stdio::piped());
141 if let Some(a) = &self.conf.args {
142 proc.args(a.split_whitespace());
143 };
144 if let Some(c) = &self.conf.directory {
145 proc.current_dir(c);
146 };
147 let child = proc.spawn()?;
148 self.proc = Some(Arc::new(Mutex::new(child)));
149 Ok(())
150 }
151 /**
152 * Calls self.start(), then self.scan_stdout(), and finally self.scan_stderr()
153 */
154 #[inline]
155 pub async fn start_with_output(&mut self) -> Result<(), Box<dyn std::error::Error>> {
156 self.start().await?;
157 self.scan_stdout().await?;
158 self.scan_stderr().await?;
159 Ok(())
160 }
161 //TODO: process monitoring!
162 /**
163 * Returns true when process is started and false when process is stopped.
164 */
165 pub async fn started(&self) -> bool {
166 self.proc.is_some()
167 }
168 /**
169 * Invokes kill on the service process
170 */
171 pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
172 if let Some(proc) = self.proc.clone() {
173 let mut lock = proc.lock().await;
174 lock.kill().await?;
175 drop(lock);
176 self.proc = None;
177 self.stdout = None;
178 self.stderr = None;
179 Ok(())
180 } else {
181 Err(Box::new(std::io::Error::new(
182 std::io::ErrorKind::NotFound,
183 "No Process Associated with Service",
184 )))
185 }
186 }
187 /**
188 * Restarts service process
189 */
190 #[inline]
191 pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> {
192 self.stop().await?;
193 self.start().await?;
194 Ok(())
195 }
196 /**
197 * Restarts service process
198 */
199 #[inline]
200 pub async fn restart_with_output(&mut self) -> Result<(), Box<dyn std::error::Error>> {
201 self.stop().await?;
202 self.start_with_output().await?;
203 Ok(())
204 }
205 /**
206 * Takes control of service process' stdout file handle and spawns a new task to continuously
207 * scan it.
208 */
209 pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> {
210 if let Some(proc) = self.proc.clone() {
211 let mut lock = proc.lock().await;
212 let stdout = if let Some(stdout) = lock.stdout.take() {
213 stdout
214 } else {
215 return Err(Box::new(std::io::Error::new(
216 std::io::ErrorKind::NotFound,
217 "No stdout handle associated with process",
218 )));
219 };
220 drop(lock);
221 let (tx, rx) = channel(1024);
222 let sname = self.conf.name.clone();
223 let suuid = self.conf.uuid.clone();
224 spawn(async move {
225 let mut br = BufReader::new(stdout).lines();
226 while let Ok(Some(line)) = br.next_line().await {
227 println!("{} ({}) :: {}", &suuid, &sname, &line);
228 if let Err(_) = tx.send(line).await {
229 return;
230 };
231 }
232 });
233 self.stdout = Some(Arc::new(Mutex::new(rx)));
234 Ok(())
235 } else {
236 Err(Box::new(std::io::Error::new(
237 std::io::ErrorKind::NotFound,
238 "No Process Associated with Service",
239 )))
240 }
241 }
242 /**
243 * Takes control of service process' stderr file handle and spawns a new task to continuously
244 * scan it.
245 */
246 pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> {
247 if let Some(proc) = self.proc.clone() {
248 let mut lock = proc.lock().await;
249 let stderr = if let Some(stderr) = lock.stderr.take() {
250 stderr
251 } else {
252 return Err(Box::new(std::io::Error::new(
253 std::io::ErrorKind::NotFound,
254 "No stderr handle associated with process",
255 )));
256 };
257 drop(lock);
258 let (tx, rx) = channel(1024);
259 let sname = self.conf.name.clone();
260 let suuid = self.conf.uuid.clone();
261 spawn(async move {
262 let mut br = BufReader::new(stderr).lines();
263 while let Ok(Some(line)) = br.next_line().await {
264 eprintln!("{} ({}) >< {}", &suuid, &sname, &line);
265 if let Err(_) = tx.send(line).await {
266 return;
267 };
268 }
269 });
270 self.stderr = Some(Arc::new(Mutex::new(rx)));
271 Ok(())
272 } else {
273 Err(Box::new(std::io::Error::new(
274 std::io::ErrorKind::NotFound,
275 "No Process Associated with Service",
276 )))
277 }
278 }
279 /**
280 * Writes to the service process' stdin, if it exists.
281 */
282 pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
283 if let Some(proc) = self.proc.clone() {
284 let mut lock = proc.lock().await;
285 let stdin = if let Some(stdin) = lock.stdin.as_mut() {
286 stdin
287 } else {
288 return Err(Box::new(std::io::Error::new(
289 std::io::ErrorKind::NotFound,
290 "No stdin handle associated with process",
291 )));
292 };
293 stdin.write(&buf.as_bytes()).await?;
294 stdin.flush().await?;
295 Ok(())
296 } else {
297 Err(Box::new(std::io::Error::new(
298 std::io::ErrorKind::NotFound,
299 "No Process Associated with Service",
300 )))
301 }
302 }
303 /**
304 * Writes a line to the service process' stdin, if it exists.
305 */
306 #[inline]
307 pub async fn writeln_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
308 self.write_stdin(format!("{}\n", buf)).await
309 }
310 }