ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/salaryman/trunk/src/service.rs
Revision: 12
Committed: Wed Jul 9 05:32:04 2025 UTC (3 months ago) by yuzu
File size: 7627 byte(s)
Log Message:
fix bug in service::Service; run fmt

File Contents

# Content
1 use serde::{Deserialize, Serialize};
2 use tokio::{
3 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
4 process::{Child, Command},
5 sync::{
6 Mutex,
7 mpsc::{Receiver, channel},
8 },
9 task::spawn,
10 };
11
12 use std::{path::PathBuf, process::Stdio, sync::Arc};
13
14 #[derive(Serialize, Deserialize, Clone, Debug)]
15 pub struct ServiceConf {
16 pub name: String,
17 pub command: String,
18 pub args: Option<String>,
19 pub directory: Option<PathBuf>,
20 pub autostart: bool,
21 }
22 impl Default for ServiceConf {
23 fn default() -> Self {
24 Self::new()
25 }
26 }
27 impl ServiceConf {
28 /**
29 * Returns a new empty `ServiceConf`
30 */
31 pub fn new() -> Self {
32 Self {
33 name: String::new(),
34 command: String::new(),
35 args: None,
36 directory: None,
37 autostart: false,
38 }
39 }
40 /**
41 * Returns a new `ServiceConf` from parts.
42 */
43 pub fn from_parts(
44 name: String,
45 command: String,
46 args: Option<String>,
47 directory: Option<PathBuf>,
48 autostart: bool,
49 ) -> Self {
50 Self {
51 name,
52 command,
53 args,
54 directory,
55 autostart,
56 }
57 }
58 }
59
60 #[derive(Debug)]
61 pub struct Service {
62 conf: ServiceConf,
63 proc: Option<Arc<Mutex<Child>>>,
64 pub stdout: Option<Arc<Mutex<Receiver<String>>>>,
65 pub stderr: Option<Arc<Mutex<Receiver<String>>>>,
66 }
67 impl Default for Service {
68 fn default() -> Self {
69 Self::new()
70 }
71 }
72 impl Service {
73 /**
74 * Returns a new empty `Service`
75 */
76 pub fn new() -> Self {
77 Self {
78 conf: ServiceConf::default(),
79 proc: None,
80 stdout: None,
81 stderr: None,
82 }
83 }
84 /**
85 * Returns a `Service` made from a `ServiceConf`.
86 */
87 pub fn from_conf(conf: &ServiceConf) -> Self {
88 Self {
89 conf: conf.clone(),
90 proc: None,
91 stdout: None,
92 stderr: None,
93 }
94 }
95 /**
96 * Returns the name of the service
97 */
98 pub async fn name(&self) -> &str {
99 &self.conf.name
100 }
101 /**
102 * Uses `tokio::process::Command` to start the service.
103 */
104 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
105 if self.proc.is_some() {
106 return Err(Box::new(std::io::Error::new(
107 std::io::ErrorKind::AlreadyExists,
108 "Process Already Exists",
109 )));
110 }
111 let cmd = &self.conf.command;
112 let mut proc = Command::new(cmd);
113 proc.stdin(Stdio::piped());
114 proc.stdout(Stdio::piped());
115 proc.stderr(Stdio::piped());
116 if let Some(a) = &self.conf.args {
117 proc.args(a.split_whitespace());
118 };
119 if let Some(c) = &self.conf.directory {
120 proc.current_dir(c);
121 };
122 let child = proc.spawn()?;
123 self.proc = Some(Arc::new(Mutex::new(child)));
124 Ok(())
125 }
126 /**
127 * Returns true when process is started and false when process is stopped.
128 */
129 pub async fn started(&self) -> bool {
130 self.proc.is_some()
131 }
132 /**
133 * Invokes kill on the service process
134 */
135 pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
136 if let Some(proc) = self.proc.clone() {
137 let mut lock = proc.lock().await;
138 lock.kill().await?;
139 drop(lock);
140 self.proc = None;
141 self.stdout = None;
142 self.stderr = None;
143 Ok(())
144 } else {
145 Err(Box::new(std::io::Error::new(
146 std::io::ErrorKind::NotFound,
147 "No Process Associated with Service",
148 )))
149 }
150 }
151 /**
152 * Restarts service process
153 */
154 pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> {
155 self.stop().await?;
156 self.start().await?;
157 Ok(())
158 }
159 /**
160 * Takes control of service process' stdout file handle and spawns a new task to continuously
161 * scan it.
162 */
163 pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> {
164 if let Some(proc) = self.proc.clone() {
165 let mut lock = proc.lock().await;
166 let stdout = if let Some(stdout) = lock.stdout.take() {
167 stdout
168 } else {
169 return Err(Box::new(std::io::Error::new(
170 std::io::ErrorKind::NotFound,
171 "No stdout handle associated with process",
172 )));
173 };
174 drop(lock);
175 let (tx, rx) = channel(1024);
176 let sname = self.conf.name.clone();
177 spawn(async move {
178 let mut br = BufReader::new(stdout).lines();
179 while let Ok(Some(line)) = br.next_line().await {
180 println!("{} :: {}", &sname, &line);
181 if let Err(_) = tx.send(line).await {
182 return;
183 };
184 }
185 });
186 self.stdout = Some(Arc::new(Mutex::new(rx)));
187 Ok(())
188 } else {
189 Err(Box::new(std::io::Error::new(
190 std::io::ErrorKind::NotFound,
191 "No Process Associated with Service",
192 )))
193 }
194 }
195 /**
196 * Takes control of service process' stderr file handle and spawns a new task to continuously
197 * scan it.
198 */
199 pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> {
200 if let Some(proc) = self.proc.clone() {
201 let mut lock = proc.lock().await;
202 let stderr = if let Some(stderr) = lock.stderr.take() {
203 stderr
204 } else {
205 return Err(Box::new(std::io::Error::new(
206 std::io::ErrorKind::NotFound,
207 "No stderr handle associated with process",
208 )));
209 };
210 drop(lock);
211 let (tx, rx) = channel(1024);
212 let sname = self.conf.name.clone();
213 spawn(async move {
214 let mut br = BufReader::new(stderr).lines();
215 while let Ok(Some(line)) = br.next_line().await {
216 eprintln!("{} :: {}", &sname, &line);
217 if let Err(_) = tx.send(line).await {
218 return;
219 };
220 }
221 });
222 self.stderr = Some(Arc::new(Mutex::new(rx)));
223 Ok(())
224 } else {
225 Err(Box::new(std::io::Error::new(
226 std::io::ErrorKind::NotFound,
227 "No Process Associated with Service",
228 )))
229 }
230 }
231 /**
232 * Writes to the service process' stdin, if it exists.
233 */
234 pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
235 if let Some(proc) = self.proc.clone() {
236 let mut lock = proc.lock().await;
237 let stdin = if let Some(stdin) = lock.stdin.as_mut() {
238 stdin
239 } else {
240 return Err(Box::new(std::io::Error::new(
241 std::io::ErrorKind::NotFound,
242 "No stdin handle associated with process",
243 )));
244 };
245 stdin.write(&buf.as_bytes()).await?;
246 stdin.flush().await?;
247 Ok(())
248 } else {
249 Err(Box::new(std::io::Error::new(
250 std::io::ErrorKind::NotFound,
251 "No Process Associated with Service",
252 )))
253 }
254 }
255 }