ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/salaryman/trunk/src/service.rs
Revision: 13
Committed: Wed Jul 9 20:24:36 2025 UTC (3 months ago) by yuzu
File size: 8577 byte(s)
Log Message:
working network communication

File Contents

# Content
1 use schemars::JsonSchema;
2 use serde::{Deserialize, Serialize};
3 use tokio::{
4 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
5 process::{Child, Command},
6 sync::{
7 Mutex,
8 mpsc::{Receiver, channel},
9 },
10 task::spawn,
11 };
12 use uuid::Uuid;
13
14 use std::{path::PathBuf, process::Stdio, sync::Arc};
15
16 #[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)]
17 pub struct ServiceConf {
18 pub uuid: Uuid,
19 pub name: String,
20 pub command: String,
21 pub args: Option<String>,
22 pub directory: Option<PathBuf>,
23 pub autostart: bool,
24 }
25 impl Default for ServiceConf {
26 fn default() -> Self {
27 Self::new()
28 }
29 }
30 impl ServiceConf {
31 /**
32 * Returns a new empty `ServiceConf`
33 */
34 pub fn new() -> Self {
35 Self {
36 uuid: Uuid::new_v4(),
37 name: String::new(),
38 command: String::new(),
39 args: None,
40 directory: None,
41 autostart: false,
42 }
43 }
44 /**
45 * Returns a new `ServiceConf` from parts.
46 */
47 pub fn from_parts(
48 uuid: Uuid,
49 name: String,
50 command: String,
51 args: Option<String>,
52 directory: Option<PathBuf>,
53 autostart: bool,
54 ) -> Self {
55 Self {
56 uuid,
57 name,
58 command,
59 args,
60 directory,
61 autostart,
62 }
63 }
64 /**
65 * Returns a new `ServiceConf` from parts with new uuid.
66 */
67 pub fn new_from_parts(
68 name: String,
69 command: String,
70 args: Option<String>,
71 directory: Option<PathBuf>,
72 autostart: bool,
73 ) -> Self {
74 Self {
75 uuid: Uuid::new_v4(),
76 name,
77 command,
78 args,
79 directory,
80 autostart,
81 }
82 }
83 }
84
85 #[derive(Debug)]
86 pub struct Service {
87 conf: ServiceConf,
88 proc: Option<Arc<Mutex<Child>>>,
89 pub stdout: Option<Arc<Mutex<Receiver<String>>>>,
90 pub stderr: Option<Arc<Mutex<Receiver<String>>>>,
91 }
92 impl Default for Service {
93 fn default() -> Self {
94 Self::new()
95 }
96 }
97 impl Service {
98 /**
99 * Returns a new empty `Service`
100 */
101 pub fn new() -> Self {
102 Self {
103 conf: ServiceConf::default(),
104 proc: None,
105 stdout: None,
106 stderr: None,
107 }
108 }
109 /**
110 * Returns a `Service` made from a `ServiceConf`.
111 */
112 pub fn from_conf(conf: &ServiceConf) -> Self {
113 Self {
114 conf: conf.clone(),
115 proc: None,
116 stdout: None,
117 stderr: None,
118 }
119 }
120 /**
121 * Returns the name of the service
122 */
123 pub async fn name(&self) -> &str {
124 &self.conf.name
125 }
126 /**
127 * Uses `tokio::process::Command` to start the service.
128 */
129 pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
130 if self.proc.is_some() {
131 return Err(Box::new(std::io::Error::new(
132 std::io::ErrorKind::AlreadyExists,
133 "Process Already Exists",
134 )));
135 }
136 let cmd = &self.conf.command;
137 let mut proc = Command::new(cmd);
138 proc.stdin(Stdio::piped());
139 proc.stdout(Stdio::piped());
140 proc.stderr(Stdio::piped());
141 if let Some(a) = &self.conf.args {
142 proc.args(a.split_whitespace());
143 };
144 if let Some(c) = &self.conf.directory {
145 proc.current_dir(c);
146 };
147 let child = proc.spawn()?;
148 self.proc = Some(Arc::new(Mutex::new(child)));
149 Ok(())
150 }
151 /**
152 * Returns true when process is started and false when process is stopped.
153 */
154 pub async fn started(&self) -> bool {
155 self.proc.is_some()
156 }
157 /**
158 * Invokes kill on the service process
159 */
160 pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
161 if let Some(proc) = self.proc.clone() {
162 let mut lock = proc.lock().await;
163 lock.kill().await?;
164 drop(lock);
165 self.proc = None;
166 self.stdout = None;
167 self.stderr = None;
168 Ok(())
169 } else {
170 Err(Box::new(std::io::Error::new(
171 std::io::ErrorKind::NotFound,
172 "No Process Associated with Service",
173 )))
174 }
175 }
176 /**
177 * Restarts service process
178 */
179 pub async fn restart(&mut self) -> Result<(), Box<dyn std::error::Error>> {
180 self.stop().await?;
181 self.start().await?;
182 Ok(())
183 }
184 /**
185 * Takes control of service process' stdout file handle and spawns a new task to continuously
186 * scan it.
187 */
188 pub async fn scan_stdout(&mut self) -> Result<(), Box<dyn std::error::Error>> {
189 if let Some(proc) = self.proc.clone() {
190 let mut lock = proc.lock().await;
191 let stdout = if let Some(stdout) = lock.stdout.take() {
192 stdout
193 } else {
194 return Err(Box::new(std::io::Error::new(
195 std::io::ErrorKind::NotFound,
196 "No stdout handle associated with process",
197 )));
198 };
199 drop(lock);
200 let (tx, rx) = channel(1024);
201 let sname = self.conf.name.clone();
202 let suuid = self.conf.uuid.clone();
203 spawn(async move {
204 let mut br = BufReader::new(stdout).lines();
205 while let Ok(Some(line)) = br.next_line().await {
206 println!("{} ({}) :: {}", &suuid, &sname, &line);
207 if let Err(_) = tx.send(line).await {
208 return;
209 };
210 }
211 });
212 self.stdout = Some(Arc::new(Mutex::new(rx)));
213 Ok(())
214 } else {
215 Err(Box::new(std::io::Error::new(
216 std::io::ErrorKind::NotFound,
217 "No Process Associated with Service",
218 )))
219 }
220 }
221 /**
222 * Takes control of service process' stderr file handle and spawns a new task to continuously
223 * scan it.
224 */
225 pub async fn scan_stderr(&mut self) -> Result<(), Box<dyn std::error::Error>> {
226 if let Some(proc) = self.proc.clone() {
227 let mut lock = proc.lock().await;
228 let stderr = if let Some(stderr) = lock.stderr.take() {
229 stderr
230 } else {
231 return Err(Box::new(std::io::Error::new(
232 std::io::ErrorKind::NotFound,
233 "No stderr handle associated with process",
234 )));
235 };
236 drop(lock);
237 let (tx, rx) = channel(1024);
238 let sname = self.conf.name.clone();
239 let suuid = self.conf.uuid.clone();
240 spawn(async move {
241 let mut br = BufReader::new(stderr).lines();
242 while let Ok(Some(line)) = br.next_line().await {
243 eprintln!("{} ({}) >< {}", &suuid, &sname, &line);
244 if let Err(_) = tx.send(line).await {
245 return;
246 };
247 }
248 });
249 self.stderr = Some(Arc::new(Mutex::new(rx)));
250 Ok(())
251 } else {
252 Err(Box::new(std::io::Error::new(
253 std::io::ErrorKind::NotFound,
254 "No Process Associated with Service",
255 )))
256 }
257 }
258 /**
259 * Writes to the service process' stdin, if it exists.
260 */
261 pub async fn write_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
262 if let Some(proc) = self.proc.clone() {
263 let mut lock = proc.lock().await;
264 let stdin = if let Some(stdin) = lock.stdin.as_mut() {
265 stdin
266 } else {
267 return Err(Box::new(std::io::Error::new(
268 std::io::ErrorKind::NotFound,
269 "No stdin handle associated with process",
270 )));
271 };
272 stdin.write(&buf.as_bytes()).await?;
273 stdin.flush().await?;
274 Ok(())
275 } else {
276 Err(Box::new(std::io::Error::new(
277 std::io::ErrorKind::NotFound,
278 "No Process Associated with Service",
279 )))
280 }
281 }
282 /**
283 * Writes a line to the service process' stdin, if it exists.
284 */
285 #[inline]
286 pub async fn writeln_stdin(&mut self, buf: String) -> Result<(), Box<dyn std::error::Error>> {
287 self.write_stdin(format!("{}\n", buf)).await
288 }
289 }