conductor/src/runner.rs

193 lines
5.3 KiB
Rust

use crate::job::{Job, Jobs};
use std::collections::HashMap;
use std::process::exit;
use std::process::Stdio;
use std::time::Duration;
use std::time::Instant;
use actix::prelude::*;
use futures::AsyncReadExt;
use super::term;
use async_process::Command;
use futures::io::AsyncBufReadExt;
use futures::io::BufReader;
#[derive(Clone, Message, Debug)]
#[rtype(result = "()")]
pub enum Event {
StartedTask(i64, Job),
Output(i64, Job, String),
Error(i64, Job, String),
FinishedTask(i64, Job, f64),
Completed(i64),
}
#[derive(Debug)]
pub struct Runner {
pub id: i64,
pub tasks: Jobs,
pub manager: Recipient<Event>,
pub current_job: Option<Job>,
pub current_job_start: Instant,
}
impl Runner {
pub fn new(id: i64, tasks: Jobs, manager: Recipient<Event>) -> Self {
Self {
tasks,
manager,
id,
current_job: None,
current_job_start: Instant::now(),
}
}
}
impl Actor for Runner {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
if let Some(task) = self.tasks.pop_front() {
ctx.notify(task);
} else {
ctx.stop();
}
}
}
impl Handler<Job> for Runner {
type Result = ();
fn handle(&mut self, job: Job, ctx: &mut Self::Context) -> Self::Result {
self.current_job = Some(job.clone());
self.current_job_start = Instant::now();
let _ = self
.manager
.do_send(Event::StartedTask(self.id, job.clone()));
let mut child = Command::new("sh")
.arg("-c")
.arg(&job.command.to_string())
.envs(&job.env)
.current_dir(&job.path)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
let stdout = child
.stdout
.take()
.expect("child did not have a handle to stdout");
let stderr = child
.stderr
.take()
.expect("child did not have a handle to stdout");
ctx.add_stream(BufReader::new(stdout.chain(stderr)).lines());
let fut = async move {
child
.status()
.await
.expect("child process encountered an error")
};
let fut = actix::fut::wrap_future::<_, Self>(fut).map(move |status, _, ctx| {
if job.keep_alive && status.code() == Some(0) {
let delay = Duration::from_secs(job.retry_delay);
term::header(&job.name, &format!("Restarting in {}s", delay.as_secs()));
ctx.notify_later(job, delay);
}
});
ctx.spawn(fut);
}
}
impl StreamHandler<Result<String, std::io::Error>> for Runner {
fn handle(&mut self, item: Result<String, std::io::Error>, _: &mut Self::Context) {
if let Some(ref job) = self.current_job {
let ev = match item {
Ok(v) => Event::Output(self.id, job.clone(), v),
Err(e) => Event::Error(self.id, job.clone(), e.to_string()),
};
let _ = self.manager.do_send(ev);
}
}
fn finished(&mut self, ctx: &mut Self::Context) {
let _ = self.manager.do_send(Event::FinishedTask(
self.id,
self.current_job.clone().unwrap_or_default(),
self.current_job_start.elapsed().as_secs_f64(),
));
if let Some(task) = self.tasks.pop() {
self.current_job = None;
ctx.notify(task);
} else if !self
.current_job
.as_ref()
.map(|i| i.keep_alive)
.unwrap_or(false)
{
let _ = self.manager.do_send(Event::Completed(self.id));
ctx.stop();
}
}
}
#[derive(Default)]
pub struct Manager {
id_count: i64,
runners: HashMap<i64, Addr<Runner>>,
}
impl Manager {
pub fn jobs(jobs: Jobs) {
Self::from_registry().do_send(jobs);
}
}
impl Supervised for Manager {}
impl SystemService for Manager {}
impl Actor for Manager {
type Context = Context<Self>;
}
impl Handler<Event> for Manager {
type Result = ();
fn handle(&mut self, msg: Event, ctx: &mut Self::Context) -> Self::Result {
match msg {
Event::StartedTask(_, job) => {
term::header(job.name, "Started");
}
Event::Output(_, job, v) => {
term::output(job.name, job.color, v);
}
Event::Error(_, job, v) => {
println!("[{}: {}]", job.name, v);
}
Event::FinishedTask(_, job, time) => {
term::header(job.name, &format!("Finished ({:.2}s)", time));
}
Event::Completed(id) => {
self.runners.remove(&id);
if self.runners.is_empty() {
ctx.stop();
System::current().stop();
exit(0)
}
}
}
}
}
impl Handler<Jobs> for Manager {
type Result = ();
fn handle(&mut self, msg: Jobs, ctx: &mut Self::Context) -> Self::Result {
self.id_count += 1;
let addr = Runner::new(self.id_count, msg, ctx.address().recipient()).start();
self.runners.insert(self.id_count, addr);
}
}