ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/salaryman/trunk/src/service.rs
Revision: 8
Committed: Tue Jul 8 01:49:14 2025 UTC (3 months ago) by yuzu
Original Path: trunk/src/model.rs
File size: 7650 byte(s)
Log Message:
go fully async

File Contents

# Content
1 use serde::{Deserialize, Serialize};
2 use tokio::{
3 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
4 process::{Child, Command},
5 sync::{
6 Mutex,
7 mpsc::{Receiver, channel},
8 },
9 task::spawn,
10 };
11
12 use std::{path::PathBuf, process::Stdio, sync::Arc};
13
14 #[derive(Serialize, Deserialize, Clone, Debug)]
15 pub struct ServiceConf {
16 pub name: String,
17 pub command: String,
18 pub args: Option<String>,
19 pub directory: Option<PathBuf>,
20 pub autostart: bool,
21 }
22 impl Default for ServiceConf {
23 fn default() -> Self {
24 Self::new()
25 }
26 }
27 impl ServiceConf {
28 /**
29 * Returns a new empty `ServiceConf`
30 */
31 pub fn new() -> Self {
32 Self {
33 name: String::new(),
34 command: String::new(),
35 args: None,
36 directory: None,
37 autostart: false,
38 }
39 }
40 /**
41 * Returns a new `ServiceConf` from parts.
42 */
43 pub fn from_parts(
44 name: String,
45 command: String,
46 args: Option<String>,
47 directory: Option<PathBuf>,
48 autostart: bool,
49 ) -> Self {
50 Self {
51 name,
52 command,
53 args,
54 directory,
55 autostart,
56 }
57 }
58 }
59
60 #[derive(Debug)]
61 pub struct Service {
62 conf: ServiceConf,
63 proc: Option<Arc<Mutex<Child>>>,
64 stdout: Option<Arc<Mutex<Receiver<String>>>>,
65 stderr: Option<Arc<Mutex<Receiver<String>>>>,
66 }
67 impl Default for Service {
68 fn default() -> Self {
69 Self::new()
70 }
71 }
72 impl Service {
73 /**
74 * Returns a new empty `Service`
75 */
76 pub fn new() -> Self {
77 Self {
78 conf: ServiceConf::default(),
79 proc: None,
80 stdout: None,
81 stderr: None,
82 }
83 }
84 /**
85 * Returns a `Service` made from a `ServiceConf`.
86 */
87 pub fn from_conf(conf: &ServiceConf) -> Self {
88 Self {
89 conf: conf.clone(),
90 proc: None,
91 stdout: None,
92 stderr: None,
93 }
94 }
95 /**
96 * Returns the name of the service
97 */
98 pub async fn name(&self) -> &str {
99 &self.conf.name
100 }
101 /**
102 * Uses `tokio::process::Command` to start the service.
103 */
104 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
105 if self.proc.is_some() {
106 return Err(Box::new(std::io::Error::new(
107 std::io::ErrorKind::AlreadyExists,
108 "Process Already Exists",
109 )));
110 }
111 let cmd = &self.conf.command;
112 let args = if let Some(a) = &self.conf.args {
113 a.split_whitespace()
114 } else {
115 "".split_whitespace()
116 };
117 let cwd = if let Some(c) = &self.conf.directory {
118 c
119 } else {
120 &PathBuf::from("/")
121 };
122 let child = Command::new(cmd)
123 .args(args)
124 .current_dir(cwd)
125 .stdin(Stdio::piped())
126 .stdout(Stdio::piped())
127 .stderr(Stdio::piped())
128 .spawn()?;
129 self.proc = Some(Arc::new(Mutex::new(child)));
130 Ok(())
131 }
132 /**
133 * Returns true when process is started and false when process is stopped.
134 */
135 pub async fn started(&self) -> bool {
136 self.proc.is_some()
137 }
138 /**
139 * Invokes kill on the service process
140 */
141 pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
142 if let Some(proc) = self.proc.clone() {
143 let mut lock = proc.lock().await;
144 lock.kill().await?;
145 drop(lock);
146 self.proc = None;
147 Ok(())
148 } else {
149 Err(Box::new(std::io::Error::new(
150 std::io::ErrorKind::NotFound,
151 "No Process Associated with Service",
152 )))
153 }
154 }
155 /**
156 * Restarts service process
157 */
158 pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> {
159 self.stop().await?;
160 self.start().await?;
161 Ok(())
162 }
163 /**
164 * Takes control of service process' stdout file handle and spawns a new task to continuously
165 * scan it.
166 */
167 pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> {
168 if let Some(proc) = self.proc.clone() {
169 let mut lock = proc.lock().await;
170 let stdout = if let Some(stdout) = lock.stdout.take() {
171 stdout
172 } else {
173 return Err(Box::new(std::io::Error::new(
174 std::io::ErrorKind::NotFound,
175 "No stdout handle associated with process",
176 )));
177 };
178 drop(lock);
179 let (tx, rx) = channel(100);
180 let sname = self.conf.name.clone();
181 spawn(async move {
182 let mut br = BufReader::new(stdout).lines();
183 while let Ok(Some(line)) = br.next_line().await {
184 println!("{} :: {}", &sname, &line);
185 if let Err(_) = tx.send(line).await {
186 return;
187 };
188 }
189 });
190 self.stdout = Some(Arc::new(Mutex::new(rx)));
191 Ok(())
192 } else {
193 Err(Box::new(std::io::Error::new(
194 std::io::ErrorKind::NotFound,
195 "No Process Associated with Service",
196 )))
197 }
198 }
199 /**
200 * Takes control of service process' stderr file handle and spawns a new task to continuously
201 * scan it.
202 */
203 pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> {
204 if let Some(proc) = self.proc.clone() {
205 let mut lock = proc.lock().await;
206 let stderr = if let Some(stderr) = lock.stderr.take() {
207 stderr
208 } else {
209 return Err(Box::new(std::io::Error::new(
210 std::io::ErrorKind::NotFound,
211 "No stderr handle associated with process",
212 )));
213 };
214 drop(lock);
215 let (tx, rx) = channel(100);
216 let sname = self.conf.name.clone();
217 spawn(async move {
218 let mut br = BufReader::new(stderr).lines();
219 while let Ok(Some(line)) = br.next_line().await {
220 println!("ERR :: {} :: {}", &sname, &line);
221 if let Err(_) = tx.send(line).await {
222 return;
223 };
224 }
225 });
226 self.stderr = Some(Arc::new(Mutex::new(rx)));
227 Ok(())
228 } else {
229 Err(Box::new(std::io::Error::new(
230 std::io::ErrorKind::NotFound,
231 "No Process Associated with Service",
232 )))
233 }
234 }
235 /**
236 * Writes to the service process' stdin, if it exists.
237 */
238 pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
239 if let Some(proc) = self.proc.clone() {
240 let mut lock = proc.lock().await;
241 let stdin = if let Some(stdin) = lock.stdin.as_mut() {
242 stdin
243 } else {
244 return Err(Box::new(std::io::Error::new(
245 std::io::ErrorKind::NotFound,
246 "No stdin handle associated with process",
247 )));
248 };
249 stdin.write(&buf.as_bytes()).await?;
250 Ok(())
251 } else {
252 Err(Box::new(std::io::Error::new(
253 std::io::ErrorKind::NotFound,
254 "No Process Associated with Service",
255 )))
256 }
257 }
258 }