ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/salaryman/trunk/src/service.rs
Revision: 13
Committed: Wed Jul 9 20:24:36 2025 UTC (3 months ago) by yuzu
File size: 8577 byte(s)
Log Message:
working network communication

File Contents

# User Rev Content
1 yuzu 13 use schemars::JsonSchema;
2 yuzu 7 use serde::{Deserialize, Serialize};
3 yuzu 8 use tokio::{
4     io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
5     process::{Child, Command},
6     sync::{
7     Mutex,
8     mpsc::{Receiver, channel},
9     },
10     task::spawn,
11 yuzu 7 };
12 yuzu 13 use uuid::Uuid;
13 yuzu 7
14 yuzu 8 use std::{path::PathBuf, process::Stdio, sync::Arc};
15    
16 yuzu 13 #[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)]
17 yuzu 7 pub struct ServiceConf {
18 yuzu 13 pub uuid: Uuid,
19 yuzu 7 pub name: String,
20 yuzu 8 pub command: String,
21     pub args: Option<String>,
22     pub directory: Option<PathBuf>,
23 yuzu 7 pub autostart: bool,
24     }
25 yuzu 8 impl Default for ServiceConf {
26     fn default() -> Self {
27     Self::new()
28     }
29     }
30 yuzu 7 impl ServiceConf {
31 yuzu 8 /**
32     * Returns a new empty `ServiceConf`
33     */
34 yuzu 7 pub fn new() -> Self {
35     Self {
36 yuzu 13 uuid: Uuid::new_v4(),
37 yuzu 7 name: String::new(),
38     command: String::new(),
39     args: None,
40     directory: None,
41     autostart: false,
42     }
43     }
44 yuzu 8 /**
45     * Returns a new `ServiceConf` from parts.
46     */
47 yuzu 7 pub fn from_parts(
48 yuzu 13 uuid: Uuid,
49 yuzu 7 name: String,
50     command: String,
51     args: Option<String>,
52     directory: Option<PathBuf>,
53     autostart: bool,
54     ) -> Self {
55     Self {
56 yuzu 13 uuid,
57 yuzu 7 name,
58     command,
59     args,
60     directory,
61     autostart,
62     }
63     }
64 yuzu 13 /**
65     * Returns a new `ServiceConf` from parts with new uuid.
66     */
67     pub fn new_from_parts(
68     name: String,
69     command: String,
70     args: Option<String>,
71     directory: Option<PathBuf>,
72     autostart: bool,
73     ) -> Self {
74     Self {
75     uuid: Uuid::new_v4(),
76     name,
77     command,
78     args,
79     directory,
80     autostart,
81     }
82     }
83 yuzu 7 }
84    
85     #[derive(Debug)]
86     pub struct Service {
87 yuzu 8 conf: ServiceConf,
88     proc: Option<Arc<Mutex<Child>>>,
89 yuzu 9 pub stdout: Option<Arc<Mutex<Receiver<String>>>>,
90     pub stderr: Option<Arc<Mutex<Receiver<String>>>>,
91 yuzu 7 }
92 yuzu 8 impl Default for Service {
93     fn default() -> Self {
94     Self::new()
95     }
96     }
97 yuzu 7 impl Service {
98 yuzu 8 /**
99     * Returns a new empty `Service`
100     */
101 yuzu 7 pub fn new() -> Self {
102     Self {
103 yuzu 8 conf: ServiceConf::default(),
104     proc: None,
105     stdout: None,
106     stderr: None,
107 yuzu 7 }
108     }
109 yuzu 8 /**
110     * Returns a `Service` made from a `ServiceConf`.
111     */
112     pub fn from_conf(conf: &ServiceConf) -> Self {
113 yuzu 7 Self {
114 yuzu 8 conf: conf.clone(),
115     proc: None,
116     stdout: None,
117     stderr: None,
118 yuzu 7 }
119     }
120 yuzu 8 /**
121     * Returns the name of the service
122     */
123     pub async fn name(&self) -> &str {
124     &self.conf.name
125     }
126     /**
127     * Uses `tokio::process::Command` to start the service.
128     */
129     pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
130     if self.proc.is_some() {
131 yuzu 7 return Err(Box::new(std::io::Error::new(
132     std::io::ErrorKind::AlreadyExists,
133 yuzu 8 "Process Already Exists",
134 yuzu 7 )));
135     }
136 yuzu 8 let cmd = &self.conf.command;
137 yuzu 9 let mut proc = Command::new(cmd);
138     proc.stdin(Stdio::piped());
139     proc.stdout(Stdio::piped());
140     proc.stderr(Stdio::piped());
141     if let Some(a) = &self.conf.args {
142     proc.args(a.split_whitespace());
143 yuzu 7 };
144 yuzu 9 if let Some(c) = &self.conf.directory {
145     proc.current_dir(c);
146 yuzu 7 };
147 yuzu 9 let child = proc.spawn()?;
148 yuzu 8 self.proc = Some(Arc::new(Mutex::new(child)));
149 yuzu 7 Ok(())
150     }
151 yuzu 8 /**
152     * Returns true when process is started and false when process is stopped.
153     */
154     pub async fn started(&self) -> bool {
155     self.proc.is_some()
156     }
157     /**
158     * Invokes kill on the service process
159     */
160     pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
161     if let Some(proc) = self.proc.clone() {
162     let mut lock = proc.lock().await;
163     lock.kill().await?;
164     drop(lock);
165     self.proc = None;
166 yuzu 12 self.stdout = None;
167     self.stderr = None;
168 yuzu 8 Ok(())
169     } else {
170     Err(Box::new(std::io::Error::new(
171     std::io::ErrorKind::NotFound,
172     "No Process Associated with Service",
173     )))
174     }
175     }
176     /**
177     * Restarts service process
178     */
179     pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> {
180     self.stop().await?;
181     self.start().await?;
182     Ok(())
183     }
184     /**
185     * Takes control of service process' stdout file handle and spawns a new task to continuously
186     * scan it.
187     */
188     pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> {
189     if let Some(proc) = self.proc.clone() {
190     let mut lock = proc.lock().await;
191     let stdout = if let Some(stdout) = lock.stdout.take() {
192     stdout
193 yuzu 7 } else {
194     return Err(Box::new(std::io::Error::new(
195 yuzu 8 std::io::ErrorKind::NotFound,
196     "No stdout handle associated with process",
197     )));
198     };
199     drop(lock);
200 yuzu 12 let (tx, rx) = channel(1024);
201 yuzu 8 let sname = self.conf.name.clone();
202 yuzu 13 let suuid = self.conf.uuid.clone();
203 yuzu 8 spawn(async move {
204     let mut br = BufReader::new(stdout).lines();
205     while let Ok(Some(line)) = br.next_line().await {
206 yuzu 13 println!("{} ({}) :: {}", &suuid, &sname, &line);
207 yuzu 8 if let Err(_) = tx.send(line).await {
208     return;
209     };
210     }
211     });
212     self.stdout = Some(Arc::new(Mutex::new(rx)));
213     Ok(())
214 yuzu 7 } else {
215 yuzu 8 Err(Box::new(std::io::Error::new(
216 yuzu 7 std::io::ErrorKind::NotFound,
217 yuzu 8 "No Process Associated with Service",
218     )))
219     }
220 yuzu 7 }
221 yuzu 8 /**
222     * Takes control of service process' stderr file handle and spawns a new task to continuously
223     * scan it.
224     */
225     pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> {
226     if let Some(proc) = self.proc.clone() {
227     let mut lock = proc.lock().await;
228     let stderr = if let Some(stderr) = lock.stderr.take() {
229     stderr
230     } else {
231     return Err(Box::new(std::io::Error::new(
232     std::io::ErrorKind::NotFound,
233     "No stderr handle associated with process",
234     )));
235     };
236     drop(lock);
237 yuzu 12 let (tx, rx) = channel(1024);
238 yuzu 8 let sname = self.conf.name.clone();
239 yuzu 13 let suuid = self.conf.uuid.clone();
240 yuzu 8 spawn(async move {
241     let mut br = BufReader::new(stderr).lines();
242     while let Ok(Some(line)) = br.next_line().await {
243 yuzu 13 eprintln!("{} ({}) >< {}", &suuid, &sname, &line);
244 yuzu 8 if let Err(_) = tx.send(line).await {
245     return;
246     };
247 yuzu 7 }
248 yuzu 8 });
249     self.stderr = Some(Arc::new(Mutex::new(rx)));
250     Ok(())
251 yuzu 7 } else {
252     Err(Box::new(std::io::Error::new(
253     std::io::ErrorKind::NotFound,
254 yuzu 8 "No Process Associated with Service",
255 yuzu 7 )))
256     }
257     }
258 yuzu 8 /**
259     * Writes to the service process' stdin, if it exists.
260     */
261     pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
262     if let Some(proc) = self.proc.clone() {
263     let mut lock = proc.lock().await;
264     let stdin = if let Some(stdin) = lock.stdin.as_mut() {
265     stdin
266 yuzu 7 } else {
267 yuzu 8 return Err(Box::new(std::io::Error::new(
268     std::io::ErrorKind::NotFound,
269     "No stdin handle associated with process",
270     )));
271     };
272     stdin.write(&buf.as_bytes()).await?;
273 yuzu 9 stdin.flush().await?;
274 yuzu 8 Ok(())
275 yuzu 7 } else {
276     Err(Box::new(std::io::Error::new(
277     std::io::ErrorKind::NotFound,
278 yuzu 8 "No Process Associated with Service",
279 yuzu 7 )))
280     }
281     }
282 yuzu 13 /**
283     * Writes a line to the service process' stdin, if it exists.
284     */
285     #[inline]
286     pub async fn writeln_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
287     self.write_stdin(format!("{}\n", buf)).await
288     }
289 yuzu 7 }