ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/salaryman/trunk/src/service.rs
Revision: 12
Committed: Wed Jul 9 05:32:04 2025 UTC (3 months ago) by yuzu
File size: 7627 byte(s)
Log Message:
fix bug in service::Service; run fmt

File Contents

# User Rev Content
1 yuzu 7 use serde::{Deserialize, Serialize};
2 yuzu 8 use tokio::{
3     io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
4     process::{Child, Command},
5     sync::{
6     Mutex,
7     mpsc::{Receiver, channel},
8     },
9     task::spawn,
10 yuzu 7 };
11    
12 yuzu 8 use std::{path::PathBuf, process::Stdio, sync::Arc};
13    
14 yuzu 7 #[derive(Serialize, Deserialize, Clone, Debug)]
15     pub struct ServiceConf {
16     pub name: String,
17 yuzu 8 pub command: String,
18     pub args: Option<String>,
19     pub directory: Option<PathBuf>,
20 yuzu 7 pub autostart: bool,
21     }
22 yuzu 8 impl Default for ServiceConf {
23     fn default() -> Self {
24     Self::new()
25     }
26     }
27 yuzu 7 impl ServiceConf {
28 yuzu 8 /**
29     * Returns a new empty `ServiceConf`
30     */
31 yuzu 7 pub fn new() -> Self {
32     Self {
33     name: String::new(),
34     command: String::new(),
35     args: None,
36     directory: None,
37     autostart: false,
38     }
39     }
40 yuzu 8 /**
41     * Returns a new `ServiceConf` from parts.
42     */
43 yuzu 7 pub fn from_parts(
44     name: String,
45     command: String,
46     args: Option<String>,
47     directory: Option<PathBuf>,
48     autostart: bool,
49     ) -> Self {
50     Self {
51     name,
52     command,
53     args,
54     directory,
55     autostart,
56     }
57     }
58     }
59    
60     #[derive(Debug)]
61     pub struct Service {
62 yuzu 8 conf: ServiceConf,
63     proc: Option<Arc<Mutex<Child>>>,
64 yuzu 9 pub stdout: Option<Arc<Mutex<Receiver<String>>>>,
65     pub stderr: Option<Arc<Mutex<Receiver<String>>>>,
66 yuzu 7 }
67 yuzu 8 impl Default for Service {
68     fn default() -> Self {
69     Self::new()
70     }
71     }
72 yuzu 7 impl Service {
73 yuzu 8 /**
74     * Returns a new empty `Service`
75     */
76 yuzu 7 pub fn new() -> Self {
77     Self {
78 yuzu 8 conf: ServiceConf::default(),
79     proc: None,
80     stdout: None,
81     stderr: None,
82 yuzu 7 }
83     }
84 yuzu 8 /**
85     * Returns a `Service` made from a `ServiceConf`.
86     */
87     pub fn from_conf(conf: &ServiceConf) -> Self {
88 yuzu 7 Self {
89 yuzu 8 conf: conf.clone(),
90     proc: None,
91     stdout: None,
92     stderr: None,
93 yuzu 7 }
94     }
95 yuzu 8 /**
96     * Returns the name of the service
97     */
98     pub async fn name(&self) -> &str {
99     &self.conf.name
100     }
101     /**
102     * Uses `tokio::process::Command` to start the service.
103     */
104     pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
105     if self.proc.is_some() {
106 yuzu 7 return Err(Box::new(std::io::Error::new(
107     std::io::ErrorKind::AlreadyExists,
108 yuzu 8 "Process Already Exists",
109 yuzu 7 )));
110     }
111 yuzu 8 let cmd = &self.conf.command;
112 yuzu 9 let mut proc = Command::new(cmd);
113     proc.stdin(Stdio::piped());
114     proc.stdout(Stdio::piped());
115     proc.stderr(Stdio::piped());
116     if let Some(a) = &self.conf.args {
117     proc.args(a.split_whitespace());
118 yuzu 7 };
119 yuzu 9 if let Some(c) = &self.conf.directory {
120     proc.current_dir(c);
121 yuzu 7 };
122 yuzu 9 let child = proc.spawn()?;
123 yuzu 8 self.proc = Some(Arc::new(Mutex::new(child)));
124 yuzu 7 Ok(())
125     }
126 yuzu 8 /**
127     * Returns true when process is started and false when process is stopped.
128     */
129     pub async fn started(&self) -> bool {
130     self.proc.is_some()
131     }
132     /**
133     * Invokes kill on the service process
134     */
135     pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
136     if let Some(proc) = self.proc.clone() {
137     let mut lock = proc.lock().await;
138     lock.kill().await?;
139     drop(lock);
140     self.proc = None;
141 yuzu 12 self.stdout = None;
142     self.stderr = None;
143 yuzu 8 Ok(())
144     } else {
145     Err(Box::new(std::io::Error::new(
146     std::io::ErrorKind::NotFound,
147     "No Process Associated with Service",
148     )))
149     }
150     }
151     /**
152     * Restarts service process
153     */
154     pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> {
155     self.stop().await?;
156     self.start().await?;
157     Ok(())
158     }
159     /**
160     * Takes control of service process' stdout file handle and spawns a new task to continuously
161     * scan it.
162     */
163     pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> {
164     if let Some(proc) = self.proc.clone() {
165     let mut lock = proc.lock().await;
166     let stdout = if let Some(stdout) = lock.stdout.take() {
167     stdout
168 yuzu 7 } else {
169     return Err(Box::new(std::io::Error::new(
170 yuzu 8 std::io::ErrorKind::NotFound,
171     "No stdout handle associated with process",
172     )));
173     };
174     drop(lock);
175 yuzu 12 let (tx, rx) = channel(1024);
176 yuzu 8 let sname = self.conf.name.clone();
177     spawn(async move {
178     let mut br = BufReader::new(stdout).lines();
179     while let Ok(Some(line)) = br.next_line().await {
180     println!("{} :: {}", &sname, &line);
181     if let Err(_) = tx.send(line).await {
182     return;
183     };
184     }
185     });
186     self.stdout = Some(Arc::new(Mutex::new(rx)));
187     Ok(())
188 yuzu 7 } else {
189 yuzu 8 Err(Box::new(std::io::Error::new(
190 yuzu 7 std::io::ErrorKind::NotFound,
191 yuzu 8 "No Process Associated with Service",
192     )))
193     }
194 yuzu 7 }
195 yuzu 8 /**
196     * Takes control of service process' stderr file handle and spawns a new task to continuously
197     * scan it.
198     */
199     pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> {
200     if let Some(proc) = self.proc.clone() {
201     let mut lock = proc.lock().await;
202     let stderr = if let Some(stderr) = lock.stderr.take() {
203     stderr
204     } else {
205     return Err(Box::new(std::io::Error::new(
206     std::io::ErrorKind::NotFound,
207     "No stderr handle associated with process",
208     )));
209     };
210     drop(lock);
211 yuzu 12 let (tx, rx) = channel(1024);
212 yuzu 8 let sname = self.conf.name.clone();
213     spawn(async move {
214     let mut br = BufReader::new(stderr).lines();
215     while let Ok(Some(line)) = br.next_line().await {
216 yuzu 9 eprintln!("{} :: {}", &sname, &line);
217 yuzu 8 if let Err(_) = tx.send(line).await {
218     return;
219     };
220 yuzu 7 }
221 yuzu 8 });
222     self.stderr = Some(Arc::new(Mutex::new(rx)));
223     Ok(())
224 yuzu 7 } else {
225     Err(Box::new(std::io::Error::new(
226     std::io::ErrorKind::NotFound,
227 yuzu 8 "No Process Associated with Service",
228 yuzu 7 )))
229     }
230     }
231 yuzu 8 /**
232     * Writes to the service process' stdin, if it exists.
233     */
234     pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
235     if let Some(proc) = self.proc.clone() {
236     let mut lock = proc.lock().await;
237     let stdin = if let Some(stdin) = lock.stdin.as_mut() {
238     stdin
239 yuzu 7 } else {
240 yuzu 8 return Err(Box::new(std::io::Error::new(
241     std::io::ErrorKind::NotFound,
242     "No stdin handle associated with process",
243     )));
244     };
245     stdin.write(&buf.as_bytes()).await?;
246 yuzu 9 stdin.flush().await?;
247 yuzu 8 Ok(())
248 yuzu 7 } else {
249     Err(Box::new(std::io::Error::new(
250     std::io::ErrorKind::NotFound,
251 yuzu 8 "No Process Associated with Service",
252 yuzu 7 )))
253     }
254     }
255     }