193 lines
5.3 KiB
Rust
193 lines
5.3 KiB
Rust
use crate::job::{Job, Jobs};
|
|
use std::collections::HashMap;
|
|
use std::process::exit;
|
|
use std::process::Stdio;
|
|
use std::time::Duration;
|
|
use std::time::Instant;
|
|
|
|
use actix::prelude::*;
|
|
use futures::AsyncReadExt;
|
|
|
|
use super::term;
|
|
use async_process::Command;
|
|
use futures::io::AsyncBufReadExt;
|
|
use futures::io::BufReader;
|
|
|
|
#[derive(Clone, Message, Debug)]
|
|
#[rtype(result = "()")]
|
|
pub enum Event {
|
|
StartedTask(i64, Job),
|
|
Output(i64, Job, String),
|
|
Error(i64, Job, String),
|
|
FinishedTask(i64, Job, f64),
|
|
Completed(i64),
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct Runner {
|
|
pub id: i64,
|
|
pub tasks: Jobs,
|
|
pub manager: Recipient<Event>,
|
|
pub current_job: Option<Job>,
|
|
pub current_job_start: Instant,
|
|
}
|
|
|
|
impl Runner {
|
|
pub fn new(id: i64, tasks: Jobs, manager: Recipient<Event>) -> Self {
|
|
Self {
|
|
tasks,
|
|
manager,
|
|
id,
|
|
current_job: None,
|
|
current_job_start: Instant::now(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Actor for Runner {
|
|
type Context = Context<Self>;
|
|
|
|
fn started(&mut self, ctx: &mut Self::Context) {
|
|
if let Some(task) = self.tasks.pop_front() {
|
|
ctx.notify(task);
|
|
} else {
|
|
ctx.stop();
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Handler<Job> for Runner {
|
|
type Result = ();
|
|
fn handle(&mut self, job: Job, ctx: &mut Self::Context) -> Self::Result {
|
|
self.current_job = Some(job.clone());
|
|
self.current_job_start = Instant::now();
|
|
let _ = self
|
|
.manager
|
|
.do_send(Event::StartedTask(self.id, job.clone()));
|
|
|
|
let mut child = Command::new("sh")
|
|
.arg("-c")
|
|
.arg(&job.command.to_string())
|
|
.envs(&job.env)
|
|
.current_dir(&job.path)
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.spawn()
|
|
.unwrap();
|
|
|
|
let stdout = child
|
|
.stdout
|
|
.take()
|
|
.expect("child did not have a handle to stdout");
|
|
let stderr = child
|
|
.stderr
|
|
.take()
|
|
.expect("child did not have a handle to stdout");
|
|
ctx.add_stream(BufReader::new(stdout.chain(stderr)).lines());
|
|
let fut = async move {
|
|
child
|
|
.status()
|
|
.await
|
|
.expect("child process encountered an error")
|
|
};
|
|
let fut = actix::fut::wrap_future::<_, Self>(fut).map(move |status, _, ctx| {
|
|
if job.keep_alive && status.code() == Some(0) {
|
|
let delay = Duration::from_secs(job.retry_delay);
|
|
term::header(&job.name, &format!("Restarting in {}s", delay.as_secs()));
|
|
ctx.notify_later(job, delay);
|
|
}
|
|
});
|
|
|
|
ctx.spawn(fut);
|
|
}
|
|
}
|
|
|
|
impl StreamHandler<Result<String, std::io::Error>> for Runner {
|
|
fn handle(&mut self, item: Result<String, std::io::Error>, _: &mut Self::Context) {
|
|
if let Some(ref job) = self.current_job {
|
|
let ev = match item {
|
|
Ok(v) => Event::Output(self.id, job.clone(), v),
|
|
Err(e) => Event::Error(self.id, job.clone(), e.to_string()),
|
|
};
|
|
let _ = self.manager.do_send(ev);
|
|
}
|
|
}
|
|
|
|
fn finished(&mut self, ctx: &mut Self::Context) {
|
|
let _ = self.manager.do_send(Event::FinishedTask(
|
|
self.id,
|
|
self.current_job.clone().unwrap_or_default(),
|
|
self.current_job_start.elapsed().as_secs_f64(),
|
|
));
|
|
if let Some(task) = self.tasks.pop() {
|
|
self.current_job = None;
|
|
ctx.notify(task);
|
|
} else if !self
|
|
.current_job
|
|
.as_ref()
|
|
.map(|i| i.keep_alive)
|
|
.unwrap_or(false)
|
|
{
|
|
let _ = self.manager.do_send(Event::Completed(self.id));
|
|
ctx.stop();
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Default)]
|
|
pub struct Manager {
|
|
id_count: i64,
|
|
runners: HashMap<i64, Addr<Runner>>,
|
|
}
|
|
|
|
impl Manager {
|
|
pub fn jobs(jobs: Jobs) {
|
|
Self::from_registry().do_send(jobs);
|
|
}
|
|
}
|
|
|
|
impl Supervised for Manager {}
|
|
impl SystemService for Manager {}
|
|
|
|
impl Actor for Manager {
|
|
type Context = Context<Self>;
|
|
}
|
|
|
|
impl Handler<Event> for Manager {
|
|
type Result = ();
|
|
|
|
fn handle(&mut self, msg: Event, ctx: &mut Self::Context) -> Self::Result {
|
|
match msg {
|
|
Event::StartedTask(_, job) => {
|
|
term::header(job.name, "Started");
|
|
}
|
|
Event::Output(_, job, v) => {
|
|
term::output(job.name, job.color, v);
|
|
}
|
|
Event::Error(_, job, v) => {
|
|
println!("[{}: {}]", job.name, v);
|
|
}
|
|
Event::FinishedTask(_, job, time) => {
|
|
term::header(job.name, &format!("Finished ({:.2}s)", time));
|
|
}
|
|
Event::Completed(id) => {
|
|
self.runners.remove(&id);
|
|
if self.runners.is_empty() {
|
|
ctx.stop();
|
|
System::current().stop();
|
|
exit(0)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Handler<Jobs> for Manager {
|
|
type Result = ();
|
|
fn handle(&mut self, msg: Jobs, ctx: &mut Self::Context) -> Self::Result {
|
|
self.id_count += 1;
|
|
let addr = Runner::new(self.id_count, msg, ctx.address().recipient()).start();
|
|
self.runners.insert(self.id_count, addr);
|
|
}
|
|
}
|