ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/salaryman/trunk/src/service.rs
Revision: 14
Committed: Sat Jul 12 06:17:38 2025 UTC (2 months, 4 weeks ago) by yuzu
File size: 9192 byte(s)
Log Message:
add start, stop, restart endpoints

File Contents

# User Rev Content
1 yuzu 13 use schemars::JsonSchema;
2 yuzu 7 use serde::{Deserialize, Serialize};
3 yuzu 8 use tokio::{
4     io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
5     process::{Child, Command},
6     sync::{
7     Mutex,
8     mpsc::{Receiver, channel},
9     },
10     task::spawn,
11 yuzu 7 };
12 yuzu 13 use uuid::Uuid;
13 yuzu 7
14 yuzu 8 use std::{path::PathBuf, process::Stdio, sync::Arc};
15    
16 yuzu 13 #[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)]
17 yuzu 7 pub struct ServiceConf {
18 yuzu 13 pub uuid: Uuid,
19 yuzu 7 pub name: String,
20 yuzu 8 pub command: String,
21     pub args: Option<String>,
22     pub directory: Option<PathBuf>,
23 yuzu 7 pub autostart: bool,
24     }
25 yuzu 8 impl Default for ServiceConf {
26     fn default() -> Self {
27     Self::new()
28     }
29     }
30 yuzu 7 impl ServiceConf {
31 yuzu 8 /**
32     * Returns a new empty `ServiceConf`
33     */
34 yuzu 7 pub fn new() -> Self {
35     Self {
36 yuzu 13 uuid: Uuid::new_v4(),
37 yuzu 7 name: String::new(),
38     command: String::new(),
39     args: None,
40     directory: None,
41     autostart: false,
42     }
43     }
44 yuzu 8 /**
45     * Returns a new `ServiceConf` from parts.
46     */
47 yuzu 7 pub fn from_parts(
48 yuzu 13 uuid: Uuid,
49 yuzu 7 name: String,
50     command: String,
51     args: Option<String>,
52     directory: Option<PathBuf>,
53     autostart: bool,
54     ) -> Self {
55     Self {
56 yuzu 13 uuid,
57 yuzu 7 name,
58     command,
59     args,
60     directory,
61     autostart,
62     }
63     }
64 yuzu 13 /**
65     * Returns a new `ServiceConf` from parts with new uuid.
66     */
67     pub fn new_from_parts(
68     name: String,
69     command: String,
70     args: Option<String>,
71     directory: Option<PathBuf>,
72     autostart: bool,
73     ) -> Self {
74     Self {
75     uuid: Uuid::new_v4(),
76     name,
77     command,
78     args,
79     directory,
80     autostart,
81     }
82     }
83 yuzu 7 }
84    
85     #[derive(Debug)]
86     pub struct Service {
87 yuzu 8 conf: ServiceConf,
88     proc: Option<Arc<Mutex<Child>>>,
89 yuzu 9 pub stdout: Option<Arc<Mutex<Receiver<String>>>>,
90     pub stderr: Option<Arc<Mutex<Receiver<String>>>>,
91 yuzu 7 }
92 yuzu 8 impl Default for Service {
93     fn default() -> Self {
94     Self::new()
95     }
96     }
97 yuzu 7 impl Service {
98 yuzu 8 /**
99     * Returns a new empty `Service`
100     */
101 yuzu 7 pub fn new() -> Self {
102     Self {
103 yuzu 8 conf: ServiceConf::default(),
104     proc: None,
105     stdout: None,
106     stderr: None,
107 yuzu 7 }
108     }
109 yuzu 8 /**
110     * Returns a `Service` made from a `ServiceConf`.
111     */
112     pub fn from_conf(conf: &ServiceConf) -> Self {
113 yuzu 7 Self {
114 yuzu 8 conf: conf.clone(),
115     proc: None,
116     stdout: None,
117     stderr: None,
118 yuzu 7 }
119     }
120 yuzu 8 /**
121     * Returns the name of the service
122     */
123     pub async fn name(&self) -> &str {
124     &self.conf.name
125     }
126     /**
127     * Uses `tokio::process::Command` to start the service.
128     */
129     pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
130     if self.proc.is_some() {
131 yuzu 7 return Err(Box::new(std::io::Error::new(
132     std::io::ErrorKind::AlreadyExists,
133 yuzu 8 "Process Already Exists",
134 yuzu 7 )));
135     }
136 yuzu 8 let cmd = &self.conf.command;
137 yuzu 9 let mut proc = Command::new(cmd);
138     proc.stdin(Stdio::piped());
139     proc.stdout(Stdio::piped());
140     proc.stderr(Stdio::piped());
141     if let Some(a) = &self.conf.args {
142     proc.args(a.split_whitespace());
143 yuzu 7 };
144 yuzu 9 if let Some(c) = &self.conf.directory {
145     proc.current_dir(c);
146 yuzu 7 };
147 yuzu 9 let child = proc.spawn()?;
148 yuzu 8 self.proc = Some(Arc::new(Mutex::new(child)));
149 yuzu 7 Ok(())
150     }
151 yuzu 8 /**
152 yuzu 14 * Calls self.start(), then self.scan_stdout(), and finally self.scan_stderr()
153     */
154     #[inline]
155     pub async fn start_with_output(&mut self) -> Result<(), Box<dyn std::error::Error>> {
156     self.start().await?;
157     self.scan_stdout().await?;
158     self.scan_stderr().await?;
159     Ok(())
160     }
161     //TODO: process monitoring!
162     /**
163 yuzu 8 * Returns true when process is started and false when process is stopped.
164     */
165     pub async fn started(&self) -> bool {
166     self.proc.is_some()
167     }
168     /**
169     * Invokes kill on the service process
170     */
171     pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
172     if let Some(proc) = self.proc.clone() {
173     let mut lock = proc.lock().await;
174     lock.kill().await?;
175     drop(lock);
176     self.proc = None;
177 yuzu 12 self.stdout = None;
178     self.stderr = None;
179 yuzu 8 Ok(())
180     } else {
181     Err(Box::new(std::io::Error::new(
182     std::io::ErrorKind::NotFound,
183     "No Process Associated with Service",
184     )))
185     }
186     }
187     /**
188     * Restarts service process
189     */
190 yuzu 14 #[inline]
191 yuzu 8 pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> {
192     self.stop().await?;
193     self.start().await?;
194     Ok(())
195     }
196     /**
197 yuzu 14 * Restarts service process
198     */
199     #[inline]
200     pub async fn restart_with_output(&mut self) -> Result<(), Box<dyn std::error::Error>> {
201     self.stop().await?;
202     self.start_with_output().await?;
203     Ok(())
204     }
205     /**
206 yuzu 8 * Takes control of service process' stdout file handle and spawns a new task to continuously
207     * scan it.
208     */
209     pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> {
210     if let Some(proc) = self.proc.clone() {
211     let mut lock = proc.lock().await;
212     let stdout = if let Some(stdout) = lock.stdout.take() {
213     stdout
214 yuzu 7 } else {
215     return Err(Box::new(std::io::Error::new(
216 yuzu 8 std::io::ErrorKind::NotFound,
217     "No stdout handle associated with process",
218     )));
219     };
220     drop(lock);
221 yuzu 12 let (tx, rx) = channel(1024);
222 yuzu 8 let sname = self.conf.name.clone();
223 yuzu 13 let suuid = self.conf.uuid.clone();
224 yuzu 8 spawn(async move {
225     let mut br = BufReader::new(stdout).lines();
226     while let Ok(Some(line)) = br.next_line().await {
227 yuzu 13 println!("{} ({}) :: {}", &suuid, &sname, &line);
228 yuzu 8 if let Err(_) = tx.send(line).await {
229     return;
230     };
231     }
232     });
233     self.stdout = Some(Arc::new(Mutex::new(rx)));
234     Ok(())
235 yuzu 7 } else {
236 yuzu 8 Err(Box::new(std::io::Error::new(
237 yuzu 7 std::io::ErrorKind::NotFound,
238 yuzu 8 "No Process Associated with Service",
239     )))
240     }
241 yuzu 7 }
242 yuzu 8 /**
243     * Takes control of service process' stderr file handle and spawns a new task to continuously
244     * scan it.
245     */
246     pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> {
247     if let Some(proc) = self.proc.clone() {
248     let mut lock = proc.lock().await;
249     let stderr = if let Some(stderr) = lock.stderr.take() {
250     stderr
251     } else {
252     return Err(Box::new(std::io::Error::new(
253     std::io::ErrorKind::NotFound,
254     "No stderr handle associated with process",
255     )));
256     };
257     drop(lock);
258 yuzu 12 let (tx, rx) = channel(1024);
259 yuzu 8 let sname = self.conf.name.clone();
260 yuzu 13 let suuid = self.conf.uuid.clone();
261 yuzu 8 spawn(async move {
262     let mut br = BufReader::new(stderr).lines();
263     while let Ok(Some(line)) = br.next_line().await {
264 yuzu 13 eprintln!("{} ({}) >< {}", &suuid, &sname, &line);
265 yuzu 8 if let Err(_) = tx.send(line).await {
266     return;
267     };
268 yuzu 7 }
269 yuzu 8 });
270     self.stderr = Some(Arc::new(Mutex::new(rx)));
271     Ok(())
272 yuzu 7 } else {
273     Err(Box::new(std::io::Error::new(
274     std::io::ErrorKind::NotFound,
275 yuzu 8 "No Process Associated with Service",
276 yuzu 7 )))
277     }
278     }
279 yuzu 8 /**
280     * Writes to the service process' stdin, if it exists.
281     */
282     pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
283     if let Some(proc) = self.proc.clone() {
284     let mut lock = proc.lock().await;
285     let stdin = if let Some(stdin) = lock.stdin.as_mut() {
286     stdin
287 yuzu 7 } else {
288 yuzu 8 return Err(Box::new(std::io::Error::new(
289     std::io::ErrorKind::NotFound,
290     "No stdin handle associated with process",
291     )));
292     };
293     stdin.write(&buf.as_bytes()).await?;
294 yuzu 9 stdin.flush().await?;
295 yuzu 8 Ok(())
296 yuzu 7 } else {
297     Err(Box::new(std::io::Error::new(
298     std::io::ErrorKind::NotFound,
299 yuzu 8 "No Process Associated with Service",
300 yuzu 7 )))
301     }
302     }
303 yuzu 13 /**
304     * Writes a line to the service process' stdin, if it exists.
305     */
306     #[inline]
307     pub async fn writeln_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
308     self.write_stdin(format!("{}\n", buf)).await
309     }
310 yuzu 7 }