ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/salaryman/trunk/src/service.rs
Revision: 11
Committed: Wed Jul 9 05:14:23 2025 UTC (3 months ago) by yuzu
File size: 7561 byte(s)
Log Message:
reorganize

File Contents

# Content
1 use serde::{Deserialize, Serialize};
2 use tokio::{
3 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
4 process::{Child, Command},
5 sync::{
6 Mutex,
7 mpsc::{Receiver, channel},
8 },
9 task::spawn,
10 };
11
12 use std::{path::PathBuf, process::Stdio, sync::Arc};
13
14 #[derive(Serialize, Deserialize, Clone, Debug)]
15 pub struct ServiceConf {
16 pub name: String,
17 pub command: String,
18 pub args: Option<String>,
19 pub directory: Option<PathBuf>,
20 pub autostart: bool,
21 }
22 impl Default for ServiceConf {
23 fn default() -> Self {
24 Self::new()
25 }
26 }
27 impl ServiceConf {
28 /**
29 * Returns a new empty `ServiceConf`
30 */
31 pub fn new() -> Self {
32 Self {
33 name: String::new(),
34 command: String::new(),
35 args: None,
36 directory: None,
37 autostart: false,
38 }
39 }
40 /**
41 * Returns a new `ServiceConf` from parts.
42 */
43 pub fn from_parts(
44 name: String,
45 command: String,
46 args: Option<String>,
47 directory: Option<PathBuf>,
48 autostart: bool,
49 ) -> Self {
50 Self {
51 name,
52 command,
53 args,
54 directory,
55 autostart,
56 }
57 }
58 }
59
60 #[derive(Debug)]
61 pub struct Service {
62 conf: ServiceConf,
63 proc: Option<Arc<Mutex<Child>>>,
64 pub stdout: Option<Arc<Mutex<Receiver<String>>>>,
65 pub stderr: Option<Arc<Mutex<Receiver<String>>>>,
66 }
67 impl Default for Service {
68 fn default() -> Self {
69 Self::new()
70 }
71 }
72 impl Service {
73 /**
74 * Returns a new empty `Service`
75 */
76 pub fn new() -> Self {
77 Self {
78 conf: ServiceConf::default(),
79 proc: None,
80 stdout: None,
81 stderr: None,
82 }
83 }
84 /**
85 * Returns a `Service` made from a `ServiceConf`.
86 */
87 pub fn from_conf(conf: &ServiceConf) -> Self {
88 Self {
89 conf: conf.clone(),
90 proc: None,
91 stdout: None,
92 stderr: None,
93 }
94 }
95 /**
96 * Returns the name of the service
97 */
98 pub async fn name(&self) -> &str {
99 &self.conf.name
100 }
101 /**
102 * Uses `tokio::process::Command` to start the service.
103 */
104 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
105 if self.proc.is_some() {
106 return Err(Box::new(std::io::Error::new(
107 std::io::ErrorKind::AlreadyExists,
108 "Process Already Exists",
109 )));
110 }
111 let cmd = &self.conf.command;
112 let mut proc = Command::new(cmd);
113 proc.stdin(Stdio::piped());
114 proc.stdout(Stdio::piped());
115 proc.stderr(Stdio::piped());
116 if let Some(a) = &self.conf.args {
117 proc.args(a.split_whitespace());
118 };
119 if let Some(c) = &self.conf.directory {
120 proc.current_dir(c);
121 };
122 let child = proc.spawn()?;
123 self.proc = Some(Arc::new(Mutex::new(child)));
124 Ok(())
125 }
126 /**
127 * Returns true when process is started and false when process is stopped.
128 */
129 pub async fn started(&self) -> bool {
130 self.proc.is_some()
131 }
132 /**
133 * Invokes kill on the service process
134 */
135 pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
136 if let Some(proc) = self.proc.clone() {
137 let mut lock = proc.lock().await;
138 lock.kill().await?;
139 drop(lock);
140 self.proc = None;
141 Ok(())
142 } else {
143 Err(Box::new(std::io::Error::new(
144 std::io::ErrorKind::NotFound,
145 "No Process Associated with Service",
146 )))
147 }
148 }
149 /**
150 * Restarts service process
151 */
152 pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> {
153 self.stop().await?;
154 self.start().await?;
155 Ok(())
156 }
157 /**
158 * Takes control of service process' stdout file handle and spawns a new task to continuously
159 * scan it.
160 */
161 pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> {
162 if let Some(proc) = self.proc.clone() {
163 let mut lock = proc.lock().await;
164 let stdout = if let Some(stdout) = lock.stdout.take() {
165 stdout
166 } else {
167 return Err(Box::new(std::io::Error::new(
168 std::io::ErrorKind::NotFound,
169 "No stdout handle associated with process",
170 )));
171 };
172 drop(lock);
173 let (tx, rx) = channel(100);
174 let sname = self.conf.name.clone();
175 spawn(async move {
176 let mut br = BufReader::new(stdout).lines();
177 while let Ok(Some(line)) = br.next_line().await {
178 println!("{} :: {}", &sname, &line);
179 if let Err(_) = tx.send(line).await {
180 return;
181 };
182 }
183 });
184 self.stdout = Some(Arc::new(Mutex::new(rx)));
185 Ok(())
186 } else {
187 Err(Box::new(std::io::Error::new(
188 std::io::ErrorKind::NotFound,
189 "No Process Associated with Service",
190 )))
191 }
192 }
193 /**
194 * Takes control of service process' stderr file handle and spawns a new task to continuously
195 * scan it.
196 */
197 pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> {
198 if let Some(proc) = self.proc.clone() {
199 let mut lock = proc.lock().await;
200 let stderr = if let Some(stderr) = lock.stderr.take() {
201 stderr
202 } else {
203 return Err(Box::new(std::io::Error::new(
204 std::io::ErrorKind::NotFound,
205 "No stderr handle associated with process",
206 )));
207 };
208 drop(lock);
209 let (tx, rx) = channel(100);
210 let sname = self.conf.name.clone();
211 spawn(async move {
212 let mut br = BufReader::new(stderr).lines();
213 while let Ok(Some(line)) = br.next_line().await {
214 eprintln!("{} :: {}", &sname, &line);
215 if let Err(_) = tx.send(line).await {
216 return;
217 };
218 }
219 });
220 self.stderr = Some(Arc::new(Mutex::new(rx)));
221 Ok(())
222 } else {
223 Err(Box::new(std::io::Error::new(
224 std::io::ErrorKind::NotFound,
225 "No Process Associated with Service",
226 )))
227 }
228 }
229 /**
230 * Writes to the service process' stdin, if it exists.
231 */
232 pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
233 if let Some(proc) = self.proc.clone() {
234 let mut lock = proc.lock().await;
235 let stdin = if let Some(stdin) = lock.stdin.as_mut() {
236 stdin
237 } else {
238 return Err(Box::new(std::io::Error::new(
239 std::io::ErrorKind::NotFound,
240 "No stdin handle associated with process",
241 )));
242 };
243 stdin.write(&buf.as_bytes()).await?;
244 stdin.flush().await?;
245 Ok(())
246 } else {
247 Err(Box::new(std::io::Error::new(
248 std::io::ErrorKind::NotFound,
249 "No Process Associated with Service",
250 )))
251 }
252 }
253 }