ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/salaryman/trunk/src/service.rs
Revision: 11
Committed: Wed Jul 9 05:14:23 2025 UTC (3 months ago) by yuzu
File size: 7561 byte(s)
Log Message:
reorganize

File Contents

# User Rev Content
1 yuzu 7 use serde::{Deserialize, Serialize};
2 yuzu 8 use tokio::{
3     io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
4     process::{Child, Command},
5     sync::{
6     Mutex,
7     mpsc::{Receiver, channel},
8     },
9     task::spawn,
10 yuzu 7 };
11    
12 yuzu 8 use std::{path::PathBuf, process::Stdio, sync::Arc};
13    
14 yuzu 7 #[derive(Serialize, Deserialize, Clone, Debug)]
15     pub struct ServiceConf {
16     pub name: String,
17 yuzu 8 pub command: String,
18     pub args: Option<String>,
19     pub directory: Option<PathBuf>,
20 yuzu 7 pub autostart: bool,
21     }
22 yuzu 8 impl Default for ServiceConf {
23     fn default() -> Self {
24     Self::new()
25     }
26     }
27 yuzu 7 impl ServiceConf {
28 yuzu 8 /**
29     * Returns a new empty `ServiceConf`
30     */
31 yuzu 7 pub fn new() -> Self {
32     Self {
33     name: String::new(),
34     command: String::new(),
35     args: None,
36     directory: None,
37     autostart: false,
38     }
39     }
40 yuzu 8 /**
41     * Returns a new `ServiceConf` from parts.
42     */
43 yuzu 7 pub fn from_parts(
44     name: String,
45     command: String,
46     args: Option<String>,
47     directory: Option<PathBuf>,
48     autostart: bool,
49     ) -> Self {
50     Self {
51     name,
52     command,
53     args,
54     directory,
55     autostart,
56     }
57     }
58     }
59    
60     #[derive(Debug)]
61     pub struct Service {
62 yuzu 8 conf: ServiceConf,
63     proc: Option<Arc<Mutex<Child>>>,
64 yuzu 9 pub stdout: Option<Arc<Mutex<Receiver<String>>>>,
65     pub stderr: Option<Arc<Mutex<Receiver<String>>>>,
66 yuzu 7 }
67 yuzu 8 impl Default for Service {
68     fn default() -> Self {
69     Self::new()
70     }
71     }
72 yuzu 7 impl Service {
73 yuzu 8 /**
74     * Returns a new empty `Service`
75     */
76 yuzu 7 pub fn new() -> Self {
77     Self {
78 yuzu 8 conf: ServiceConf::default(),
79     proc: None,
80     stdout: None,
81     stderr: None,
82 yuzu 7 }
83     }
84 yuzu 8 /**
85     * Returns a `Service` made from a `ServiceConf`.
86     */
87     pub fn from_conf(conf: &ServiceConf) -> Self {
88 yuzu 7 Self {
89 yuzu 8 conf: conf.clone(),
90     proc: None,
91     stdout: None,
92     stderr: None,
93 yuzu 7 }
94     }
95 yuzu 8 /**
96     * Returns the name of the service
97     */
98     pub async fn name(&self) -> &str {
99     &self.conf.name
100     }
101     /**
102     * Uses `tokio::process::Command` to start the service.
103     */
104     pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
105     if self.proc.is_some() {
106 yuzu 7 return Err(Box::new(std::io::Error::new(
107     std::io::ErrorKind::AlreadyExists,
108 yuzu 8 "Process Already Exists",
109 yuzu 7 )));
110     }
111 yuzu 8 let cmd = &self.conf.command;
112 yuzu 9 let mut proc = Command::new(cmd);
113     proc.stdin(Stdio::piped());
114     proc.stdout(Stdio::piped());
115     proc.stderr(Stdio::piped());
116     if let Some(a) = &self.conf.args {
117     proc.args(a.split_whitespace());
118 yuzu 7 };
119 yuzu 9 if let Some(c) = &self.conf.directory {
120     proc.current_dir(c);
121 yuzu 7 };
122 yuzu 9 let child = proc.spawn()?;
123 yuzu 8 self.proc = Some(Arc::new(Mutex::new(child)));
124 yuzu 7 Ok(())
125     }
126 yuzu 8 /**
127     * Returns true when process is started and false when process is stopped.
128     */
129     pub async fn started(&self) -> bool {
130     self.proc.is_some()
131     }
132     /**
133     * Invokes kill on the service process
134     */
135     pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
136     if let Some(proc) = self.proc.clone() {
137     let mut lock = proc.lock().await;
138     lock.kill().await?;
139     drop(lock);
140     self.proc = None;
141     Ok(())
142     } else {
143     Err(Box::new(std::io::Error::new(
144     std::io::ErrorKind::NotFound,
145     "No Process Associated with Service",
146     )))
147     }
148     }
149     /**
150     * Restarts service process
151     */
152     pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> {
153     self.stop().await?;
154     self.start().await?;
155     Ok(())
156     }
157     /**
158     * Takes control of service process' stdout file handle and spawns a new task to continuously
159     * scan it.
160     */
161     pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> {
162     if let Some(proc) = self.proc.clone() {
163     let mut lock = proc.lock().await;
164     let stdout = if let Some(stdout) = lock.stdout.take() {
165     stdout
166 yuzu 7 } else {
167     return Err(Box::new(std::io::Error::new(
168 yuzu 8 std::io::ErrorKind::NotFound,
169     "No stdout handle associated with process",
170     )));
171     };
172     drop(lock);
173     let (tx, rx) = channel(100);
174     let sname = self.conf.name.clone();
175     spawn(async move {
176     let mut br = BufReader::new(stdout).lines();
177     while let Ok(Some(line)) = br.next_line().await {
178     println!("{} :: {}", &sname, &line);
179     if let Err(_) = tx.send(line).await {
180     return;
181     };
182     }
183     });
184     self.stdout = Some(Arc::new(Mutex::new(rx)));
185     Ok(())
186 yuzu 7 } else {
187 yuzu 8 Err(Box::new(std::io::Error::new(
188 yuzu 7 std::io::ErrorKind::NotFound,
189 yuzu 8 "No Process Associated with Service",
190     )))
191     }
192 yuzu 7 }
193 yuzu 8 /**
194     * Takes control of service process' stderr file handle and spawns a new task to continuously
195     * scan it.
196     */
197     pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> {
198     if let Some(proc) = self.proc.clone() {
199     let mut lock = proc.lock().await;
200     let stderr = if let Some(stderr) = lock.stderr.take() {
201     stderr
202     } else {
203     return Err(Box::new(std::io::Error::new(
204     std::io::ErrorKind::NotFound,
205     "No stderr handle associated with process",
206     )));
207     };
208     drop(lock);
209     let (tx, rx) = channel(100);
210     let sname = self.conf.name.clone();
211     spawn(async move {
212     let mut br = BufReader::new(stderr).lines();
213     while let Ok(Some(line)) = br.next_line().await {
214 yuzu 9 eprintln!("{} :: {}", &sname, &line);
215 yuzu 8 if let Err(_) = tx.send(line).await {
216     return;
217     };
218 yuzu 7 }
219 yuzu 8 });
220     self.stderr = Some(Arc::new(Mutex::new(rx)));
221     Ok(())
222 yuzu 7 } else {
223     Err(Box::new(std::io::Error::new(
224     std::io::ErrorKind::NotFound,
225 yuzu 8 "No Process Associated with Service",
226 yuzu 7 )))
227     }
228     }
229 yuzu 8 /**
230     * Writes to the service process' stdin, if it exists.
231     */
232     pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
233     if let Some(proc) = self.proc.clone() {
234     let mut lock = proc.lock().await;
235     let stdin = if let Some(stdin) = lock.stdin.as_mut() {
236     stdin
237 yuzu 7 } else {
238 yuzu 8 return Err(Box::new(std::io::Error::new(
239     std::io::ErrorKind::NotFound,
240     "No stdin handle associated with process",
241     )));
242     };
243     stdin.write(&buf.as_bytes()).await?;
244 yuzu 9 stdin.flush().await?;
245 yuzu 8 Ok(())
246 yuzu 7 } else {
247     Err(Box::new(std::io::Error::new(
248     std::io::ErrorKind::NotFound,
249 yuzu 8 "No Process Associated with Service",
250 yuzu 7 )))
251     }
252     }
253     }