conductor/src/runner.rs

190 lines
5.0 KiB
Rust

use crate::job::{Job, Jobs};
use std::collections::HashMap;
use std::process::exit;
use std::process::Stdio;
use std::time::Duration;
use actix::prelude::*;
use async_process::Child;
use async_process::Command;
use futures::io::AsyncBufReadExt;
use futures::io::BufReader;
#[derive(Clone, Message, Debug)]
#[rtype(result = "()")]
pub enum Event {
StartedTask(i64, String),
Output(i64, String, String),
Error(i64, String, String),
FinishedTask(i64, String),
Completed(i64),
}
#[derive(Debug)]
pub struct Runner {
pub id: i64,
pub tasks: Jobs,
pub manager: Recipient<Event>,
child: Option<Child>,
pub current_job: Option<Job>,
}
impl Runner {
pub fn new(id: i64, tasks: Jobs, manager: Recipient<Event>) -> Self {
Self {
tasks,
manager,
id,
current_job: None,
child: None,
}
}
}
impl Actor for Runner {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
if let Some(task) = self.tasks.pop() {
ctx.notify(task);
} else {
ctx.stop();
}
}
}
impl Handler<Job> for Runner {
type Result = ();
fn handle(&mut self, job: Job, ctx: &mut Self::Context) -> Self::Result {
self.current_job = Some(job.clone());
let _ = self
.manager
.do_send(Event::StartedTask(self.id, job.name.clone()));
let mut child = Command::new("sh")
.arg("-c")
.arg(&job.command)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
let stdout = child
.stdout
.take()
.expect("child did not have a handle to stdout");
let reader = BufReader::new(stdout).lines();
let fut = async move {
child
.status()
.await
.expect("child process encountered an error")
};
let fut = actix::fut::wrap_future::<_, Self>(fut).map(move |status, _, ctx| {
if job.keep_alive && status.code() == Some(0) {
let delay = Duration::from_secs(job.retry_delay);
arkham::vox::header(format!("Restarting {} in {}s", job.name, delay.as_secs()));
ctx.notify_later(job, delay);
}
});
ctx.spawn(fut);
ctx.add_stream(reader);
}
}
impl StreamHandler<Result<String, std::io::Error>> for Runner {
fn handle(&mut self, item: Result<String, std::io::Error>, _: &mut Self::Context) {
let ev = match item {
Ok(v) => Event::Output(
self.id,
self.current_job
.as_ref()
.map(|i| i.name.clone())
.unwrap_or_else(|| String::from("UNKNOWN")),
v,
),
Err(e) => Event::Error(
self.id,
self.current_job.as_ref().unwrap().name.clone(),
e.to_string(),
),
};
let _ = self.manager.do_send(ev);
}
fn finished(&mut self, ctx: &mut Self::Context) {
let _ = self.manager.do_send(Event::FinishedTask(
self.id,
self.current_job.as_ref().unwrap().name.clone(),
));
if let Some(task) = self.tasks.pop() {
self.current_job = None;
ctx.notify(task);
} else if !self
.current_job
.as_ref()
.map(|i| i.keep_alive)
.unwrap_or(false)
{
let _ = self.manager.do_send(Event::Completed(self.id));
ctx.stop();
}
}
}
#[derive(Default)]
pub struct Manager {
id_count: i64,
runners: HashMap<i64, Addr<Runner>>,
}
impl Manager {
pub fn new() -> Self {
Self::default()
}
}
impl Actor for Manager {
type Context = Context<Self>;
}
impl Handler<Event> for Manager {
type Result = ();
fn handle(&mut self, msg: Event, ctx: &mut Self::Context) -> Self::Result {
match msg {
Event::StartedTask(_, name) => {
arkham::vox::header(format!("{} - Started", name));
}
Event::Output(_, name, v) => {
println!("[{}] {}", name, v)
}
Event::Error(_, v, name) => {
println!("[{}] Error: {}", name, v);
}
Event::FinishedTask(_, name) => {
arkham::vox::header(format!("{} - Finished", name));
}
Event::Completed(id) => {
self.runners.remove(&id);
if self.runners.is_empty() {
ctx.stop();
System::current().stop();
exit(0)
}
}
}
}
}
impl Handler<Jobs> for Manager {
type Result = ();
fn handle(&mut self, msg: Jobs, ctx: &mut Self::Context) -> Self::Result {
self.id_count += 1;
let addr = Runner::new(self.id_count, msg, ctx.address().recipient()).start();
self.runners.insert(self.id_count, addr);
}
}