ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/salaryman/trunk/src/service.rs
Revision: 8
Committed: Tue Jul 8 01:49:14 2025 UTC (3 months ago) by yuzu
Original Path: trunk/src/model.rs
File size: 7650 byte(s)
Log Message:
go fully async

File Contents

# User Rev Content
1 yuzu 7 use serde::{Deserialize, Serialize};
2 yuzu 8 use tokio::{
3     io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
4     process::{Child, Command},
5     sync::{
6     Mutex,
7     mpsc::{Receiver, channel},
8     },
9     task::spawn,
10 yuzu 7 };
11    
12 yuzu 8 use std::{path::PathBuf, process::Stdio, sync::Arc};
13    
14 yuzu 7 #[derive(Serialize, Deserialize, Clone, Debug)]
15     pub struct ServiceConf {
16     pub name: String,
17 yuzu 8 pub command: String,
18     pub args: Option<String>,
19     pub directory: Option<PathBuf>,
20 yuzu 7 pub autostart: bool,
21     }
22 yuzu 8 impl Default for ServiceConf {
23     fn default() -> Self {
24     Self::new()
25     }
26     }
27 yuzu 7 impl ServiceConf {
28 yuzu 8 /**
29     * Returns a new empty `ServiceConf`
30     */
31 yuzu 7 pub fn new() -> Self {
32     Self {
33     name: String::new(),
34     command: String::new(),
35     args: None,
36     directory: None,
37     autostart: false,
38     }
39     }
40 yuzu 8 /**
41     * Returns a new `ServiceConf` from parts.
42     */
43 yuzu 7 pub fn from_parts(
44     name: String,
45     command: String,
46     args: Option<String>,
47     directory: Option<PathBuf>,
48     autostart: bool,
49     ) -> Self {
50     Self {
51     name,
52     command,
53     args,
54     directory,
55     autostart,
56     }
57     }
58     }
59    
60     #[derive(Debug)]
61     pub struct Service {
62 yuzu 8 conf: ServiceConf,
63     proc: Option<Arc<Mutex<Child>>>,
64     stdout: Option<Arc<Mutex<Receiver<String>>>>,
65     stderr: Option<Arc<Mutex<Receiver<String>>>>,
66 yuzu 7 }
67 yuzu 8 impl Default for Service {
68     fn default() -> Self {
69     Self::new()
70     }
71     }
72 yuzu 7 impl Service {
73 yuzu 8 /**
74     * Returns a new empty `Service`
75     */
76 yuzu 7 pub fn new() -> Self {
77     Self {
78 yuzu 8 conf: ServiceConf::default(),
79     proc: None,
80     stdout: None,
81     stderr: None,
82 yuzu 7 }
83     }
84 yuzu 8 /**
85     * Returns a `Service` made from a `ServiceConf`.
86     */
87     pub fn from_conf(conf: &ServiceConf) -> Self {
88 yuzu 7 Self {
89 yuzu 8 conf: conf.clone(),
90     proc: None,
91     stdout: None,
92     stderr: None,
93 yuzu 7 }
94     }
95 yuzu 8 /**
96     * Returns the name of the service
97     */
98     pub async fn name(&self) -> &str {
99     &self.conf.name
100     }
101     /**
102     * Uses `tokio::process::Command` to start the service.
103     */
104     pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
105     if self.proc.is_some() {
106 yuzu 7 return Err(Box::new(std::io::Error::new(
107     std::io::ErrorKind::AlreadyExists,
108 yuzu 8 "Process Already Exists",
109 yuzu 7 )));
110     }
111 yuzu 8 let cmd = &self.conf.command;
112     let args = if let Some(a) = &self.conf.args {
113     a.split_whitespace()
114 yuzu 7 } else {
115 yuzu 8 "".split_whitespace()
116 yuzu 7 };
117 yuzu 8 let cwd = if let Some(c) = &self.conf.directory {
118     c
119 yuzu 7 } else {
120 yuzu 8 &PathBuf::from("/")
121 yuzu 7 };
122 yuzu 8 let child = Command::new(cmd)
123     .args(args)
124     .current_dir(cwd)
125     .stdin(Stdio::piped())
126     .stdout(Stdio::piped())
127     .stderr(Stdio::piped())
128     .spawn()?;
129     self.proc = Some(Arc::new(Mutex::new(child)));
130 yuzu 7 Ok(())
131     }
132 yuzu 8 /**
133     * Returns true when process is started and false when process is stopped.
134     */
135     pub async fn started(&self) -> bool {
136     self.proc.is_some()
137     }
138     /**
139     * Invokes kill on the service process
140     */
141     pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
142     if let Some(proc) = self.proc.clone() {
143     let mut lock = proc.lock().await;
144     lock.kill().await?;
145     drop(lock);
146     self.proc = None;
147     Ok(())
148     } else {
149     Err(Box::new(std::io::Error::new(
150     std::io::ErrorKind::NotFound,
151     "No Process Associated with Service",
152     )))
153     }
154     }
155     /**
156     * Restarts service process
157     */
158     pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> {
159     self.stop().await?;
160     self.start().await?;
161     Ok(())
162     }
163     /**
164     * Takes control of service process' stdout file handle and spawns a new task to continuously
165     * scan it.
166     */
167     pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> {
168     if let Some(proc) = self.proc.clone() {
169     let mut lock = proc.lock().await;
170     let stdout = if let Some(stdout) = lock.stdout.take() {
171     stdout
172 yuzu 7 } else {
173     return Err(Box::new(std::io::Error::new(
174 yuzu 8 std::io::ErrorKind::NotFound,
175     "No stdout handle associated with process",
176     )));
177     };
178     drop(lock);
179     let (tx, rx) = channel(100);
180     let sname = self.conf.name.clone();
181     spawn(async move {
182     let mut br = BufReader::new(stdout).lines();
183     while let Ok(Some(line)) = br.next_line().await {
184     println!("{} :: {}", &sname, &line);
185     if let Err(_) = tx.send(line).await {
186     return;
187     };
188     }
189     });
190     self.stdout = Some(Arc::new(Mutex::new(rx)));
191     Ok(())
192 yuzu 7 } else {
193 yuzu 8 Err(Box::new(std::io::Error::new(
194 yuzu 7 std::io::ErrorKind::NotFound,
195 yuzu 8 "No Process Associated with Service",
196     )))
197     }
198 yuzu 7 }
199 yuzu 8 /**
200     * Takes control of service process' stderr file handle and spawns a new task to continuously
201     * scan it.
202     */
203     pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> {
204     if let Some(proc) = self.proc.clone() {
205     let mut lock = proc.lock().await;
206     let stderr = if let Some(stderr) = lock.stderr.take() {
207     stderr
208     } else {
209     return Err(Box::new(std::io::Error::new(
210     std::io::ErrorKind::NotFound,
211     "No stderr handle associated with process",
212     )));
213     };
214     drop(lock);
215     let (tx, rx) = channel(100);
216     let sname = self.conf.name.clone();
217     spawn(async move {
218     let mut br = BufReader::new(stderr).lines();
219     while let Ok(Some(line)) = br.next_line().await {
220     println!("ERR :: {} :: {}", &sname, &line);
221     if let Err(_) = tx.send(line).await {
222     return;
223     };
224 yuzu 7 }
225 yuzu 8 });
226     self.stderr = Some(Arc::new(Mutex::new(rx)));
227     Ok(())
228 yuzu 7 } else {
229     Err(Box::new(std::io::Error::new(
230     std::io::ErrorKind::NotFound,
231 yuzu 8 "No Process Associated with Service",
232 yuzu 7 )))
233     }
234     }
235 yuzu 8 /**
236     * Writes to the service process' stdin, if it exists.
237     */
238     pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
239     if let Some(proc) = self.proc.clone() {
240     let mut lock = proc.lock().await;
241     let stdin = if let Some(stdin) = lock.stdin.as_mut() {
242     stdin
243 yuzu 7 } else {
244 yuzu 8 return Err(Box::new(std::io::Error::new(
245     std::io::ErrorKind::NotFound,
246     "No stdin handle associated with process",
247     )));
248     };
249     stdin.write(&buf.as_bytes()).await?;
250     Ok(())
251 yuzu 7 } else {
252     Err(Box::new(std::io::Error::new(
253     std::io::ErrorKind::NotFound,
254 yuzu 8 "No Process Associated with Service",
255 yuzu 7 )))
256     }
257     }
258     }