From 43af1fd35d494fe11e9ae096615255cad6131b2b Mon Sep 17 00:00:00 2001 From: Joe Bellus Date: Sun, 25 Sep 2022 18:06:34 +0000 Subject: [PATCH] 2.0 Refactor (#2) Reviewed-on: https://git.5sigma.io/conductor/conductor/pulls/2 --- .drone.yml | 23 +- Cargo.lock | 197 ++++++---------- Cargo.toml | 10 +- conductor.yml | 28 +++ rustfmt.toml | 1 + src/definition.rs | 564 ++++++++++++++++++++++++++++++---------------- src/job.rs | 41 +++- src/main.rs | 51 +++-- src/runner.rs | 83 +++---- src/term.rs | 84 +++++++ 10 files changed, 679 insertions(+), 403 deletions(-) create mode 100644 conductor.yml create mode 100644 rustfmt.toml create mode 100644 src/term.rs diff --git a/.drone.yml b/.drone.yml index 883ab99..0395bb9 100644 --- a/.drone.yml +++ b/.drone.yml @@ -3,7 +3,24 @@ name: default steps: - name: test - image: rust:1.60 + image: rust:latest commands: - - cargo build --verbose --all - - cargo test --verbose --all \ No newline at end of file + - rustup component add clippy + - cargo clippy + - cargo test --all + +- name: deploy + image: rust:latest + commands: + - cargo build --release + - tar cvzf conductor.tar.gz -C target/release conductor + - wget https://dl.min.io/client/mc/release/linux-amd64/mc + - chmod +x mc + - ./mc alias set fivesigma https://objects.5sigma.io $MINIOID $MINIOSECRET + - ./mc cp conductor.tar.gz fivesigma/public/conductor.tar.gz + when: + event: + - promote + target: + - staging + - production diff --git a/Cargo.lock b/Cargo.lock index d1c146c..3dffffb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -59,21 +59,19 @@ dependencies = [ ] [[package]] -name = "aho-corasick" -version = "0.7.18" +name = "ansi_term" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" dependencies = [ - "memchr", + "winapi", ] [[package]] -name = "arkham" -version = "0.1.1" -dependencies = [ - "crossterm", - "textwrap", -] +name = "anyhow" +version = "1.0.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98161a4e3e2184da77bb14f02184cdd111e83bbbcc9979dfee3c44b9a85f5602" [[package]] name = "async-channel" @@ -195,14 +193,17 @@ dependencies = [ [[package]] name = "conductor" -version = "0.1.0" +version = "2.0.0" dependencies = [ "actix", "actix-rt", - "arkham", + "ansi_term", + "anyhow", "async-process", "expand_str", + "fake-tty", "futures", + "rand", "serde", "serde_yaml", ] @@ -227,32 +228,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "crossterm" -version = "0.22.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c85525306c4291d1b73ce93c8acf9c339f9b213aef6c1d85c3830cbf1c16325c" -dependencies = [ - "bitflags", - "crossterm_winapi", - "futures-core", - "libc", - "mio 0.7.14", - "parking_lot 0.11.2", - "signal-hook", - "signal-hook-mio", - "winapi", -] - -[[package]] -name = "crossterm_winapi" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ae1b35a484aa10e07fe0638d02301c5ad24de82d310ccbd2f3693da5f09bf1c" -dependencies = [ - "winapi", -] - [[package]] name = "event-listener" version = "2.5.2" @@ -265,6 +240,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7bfbc9fbd454fca65e24c398c860da7bf0b76d0d4e62eb89e2e72d69e18a0e4" +[[package]] +name = "fake-tty" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa6c2a740a5d6940f90a0f13b5828440c2a7160bd1e235cf934d5df0e7a3e1ad" + [[package]] name = "fastrand" version = "1.7.0" @@ -378,6 +359,17 @@ dependencies = [ "slab", ] +[[package]] +name = "getrandom" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "hashbrown" version = "0.12.1" @@ -440,19 +432,6 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" -[[package]] -name = "mio" -version = "0.7.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" -dependencies = [ - "libc", - "log", - "miow", - "ntapi", - "winapi", -] - [[package]] name = "mio" version = "0.8.4" @@ -465,24 +444,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "miow" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" -dependencies = [ - "winapi", -] - -[[package]] -name = "ntapi" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" -dependencies = [ - "winapi", -] - [[package]] name = "once_cell" version = "1.12.0" @@ -568,6 +529,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "ppv-lite86" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" + [[package]] name = "proc-macro2" version = "1.0.40" @@ -586,6 +553,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.2.13" @@ -595,23 +592,6 @@ dependencies = [ "bitflags", ] -[[package]] -name = "regex" -version = "1.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d83f127d94bdbcda4c8cc2e50f6f84f4b611f69c902699ca385a39c3a75f9ff1" -dependencies = [ - "aho-corasick", - "memchr", - "regex-syntax", -] - -[[package]] -name = "regex-syntax" -version = "0.6.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49b3de9ec5dc0a3417da371aab17d729997c15010e7fd24ff707773a33bddb64" - [[package]] name = "ryu" version = "1.0.10" @@ -666,17 +646,6 @@ dependencies = [ "signal-hook-registry", ] -[[package]] -name = "signal-hook-mio" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" -dependencies = [ - "libc", - "mio 0.7.14", - "signal-hook", -] - [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -698,12 +667,6 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" -[[package]] -name = "smawk" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f67ad224767faa3c7d8b6d91985b78e70a1324408abcb1cfcc2be4c06bc06043" - [[package]] name = "socket2" version = "0.4.4" @@ -725,17 +688,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "textwrap" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0066c8d12af8b5acd21e00547c3797fde4e8677254a7ee429176ccebbe93dd80" -dependencies = [ - "smawk", - "unicode-linebreak", - "unicode-width", -] - [[package]] name = "tokio" version = "1.19.2" @@ -745,7 +697,7 @@ dependencies = [ "bytes", "libc", "memchr", - "mio 0.8.4", + "mio", "once_cell", "parking_lot 0.12.1", "pin-project-lite", @@ -774,21 +726,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c" -[[package]] -name = "unicode-linebreak" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a52dcaab0c48d931f7cc8ef826fa51690a08e1ea55117ef26f89864f532383f" -dependencies = [ - "regex", -] - -[[package]] -name = "unicode-width" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" - [[package]] name = "waker-fn" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index 1f7ac10..4f48522 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "conductor" -version = "0.1.0" -edition = "2018" +version = "2.0.0" +edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -13,6 +13,8 @@ actix = "0.12.0" actix-rt = "2.4.0" async-process = "1.3.0" futures = "0.3.17" -arkham = { path = "../arkham" } - +ansi_term = "0.12.1" +rand = "0.8.5" +anyhow = "1.0.65" +fake-tty = "0.3.1" diff --git a/conductor.yml b/conductor.yml new file mode 100644 index 0000000..fac6662 --- /dev/null +++ b/conductor.yml @@ -0,0 +1,28 @@ +groups: + main: + description: Main test group + components: + - ls + - currentdir + - envtest +components: + ls: + command: exa + tasks: + beforelist: + command: echo "before list" + currentdir: + command: pwd + envtest: + env: + FOO: one + command: "echo value: $FOO" + +tasks: + sleep: + command: echo "sleeping" + build-release: + command: cargo build --release + install: + before: build-release + command: cp target/release/conductor ~/.local/bin/ diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..5f22f2e --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +edition="2021" \ No newline at end of file diff --git a/src/definition.rs b/src/definition.rs index a89286a..1a129d9 100644 --- a/src/definition.rs +++ b/src/definition.rs @@ -1,30 +1,101 @@ use crate::job::{Job, Jobs}; +use rand::Rng; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(untagged)] +pub enum Command { + Single(String), + Multiple(Vec), +} + +impl Default for Command { + fn default() -> Self { + Self::Single(String::new()) + } +} + +impl From for Command { + fn from(source: String) -> Self { + Self::Single(source) + } +} + +impl From<&str> for Command { + fn from(source: &str) -> Self { + Self::Single(source.to_string()) + } +} + +impl std::fmt::Display for Command { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Single(v) => write!(f, "{}", &v), + Self::Multiple(v) => write!(f, "{}", &v.join(" && ")), + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(untagged)] +pub enum Dependencies { + Single(String), + Multiple(Vec), +} + +impl Dependencies { + pub fn to_vec(&self) -> Vec { + match self { + Dependencies::Single(v) => vec![v.clone()], + Dependencies::Multiple(v) => v.clone(), + } + } +} + +impl Default for Dependencies { + fn default() -> Self { + Self::Multiple(vec![]) + } +} + #[derive(Serialize, Deserialize, Default)] #[serde(default)] +/// Represents an entire configuration. This is deserialized from the complete conductor.yml file. pub struct Project { + /// The components that are defined within the project. This is stored as a map with the name of + /// of the component and its definition. pub components: HashMap, - pub groups: Vec, + /// The list of groups defined in the project. These groups are stored as a map with the name of + /// the group and its definition. + pub groups: HashMap, + /// A map of values that are populated into the environment for all components and tasks. + /// This value is inclusive with any environment variables defined in the individual task or + /// component. + /// + /// If the same value is defined here and within the task/component definition. The value + /// defined in the task/component will take priority. pub env: HashMap, - pub tasks: Vec, + /// The tasks defined inside project. This is stored as a map with the task name and task + /// definition + pub tasks: HashMap, } impl Project { + /// Retreive a list of jobs from the name of a task. This function expects the fully qualified + /// task name. For subcomponents that means component::task. + /// The jobs structure returned contains a seuqential execution list with all dependencies fn get_absolute_task(&self, name: &str) -> Option { self.tasks - .iter() - .find(|t| t.name == name) + .get(name) .map(|task| { - let job = self.build_project_task_job(task); + let job = self.build_project_task_job(name, task); let mut jobs = task .before + .to_vec() .iter() - .map(|name| self.get_absolute_task(name)) - .flatten() - .map(|jobs| jobs.to_vec()) - .flatten() + .flat_map(|name| self.get_absolute_task(name)) + .flat_map(|jobs| jobs.to_vec()) .collect::>(); jobs.push(job); Jobs::new(jobs) @@ -34,16 +105,16 @@ impl Project { component .tasks .iter() - .find(|t| format!("{}:{}", c_name, t.name) == name) - .map(|task| { - let job = self.build_component_task_job(c_name, component, task); + .find(|(t_name, _task)| format!("{}:{}", c_name, t_name) == name) + .map(|(t_name, task)| { + let job = + self.build_component_task_job(c_name, t_name, component, task); let mut jobs = task .before + .to_vec() .iter() - .map(|name| self.get_absolute_task(name)) - .flatten() - .map(|jobs| jobs.to_vec()) - .flatten() + .flat_map(|name| self.get_absolute_task(name)) + .flat_map(|jobs| jobs.to_vec()) .collect::>(); jobs.push(job); Jobs::new(jobs) @@ -52,48 +123,44 @@ impl Project { }) } + /// Retrieves a job based on a relative task definition. The task is found by the direct task name + /// and a component name separately. fn get_relative_task(&self, task_name: &str, component_name: &str) -> Option { if let Some(component) = self.components.get(component_name) { - component - .tasks - .iter() - .find(|t| t.name == task_name) - .map(|task| { - let job = self.build_component_task_job(component_name, component, task); - let mut jobs = task - .before - .iter() - .map(|name| self.get_by_name(name)) - .flatten() - .map(|jobs| jobs.to_vec()) - .flatten() - .collect::>(); - jobs.push(job); - Jobs::new(jobs) - }) + component.tasks.get(task_name).map(|task| { + let job = self.build_component_task_job(component_name, task_name, component, task); + let mut jobs = task + .before + .to_vec() + .iter() + .flat_map(|name| self.get_by_name(name)) + .flat_map(|jobs| jobs.to_vec()) + .collect::>(); + jobs.push(job); + Jobs::new(jobs) + }) } else { None } } + /// Retrieves a job based on a component definition fn get_component(&self, component_name: &str) -> Option { self.components.get(component_name).map(|c| { let component_job = self.build_component_job(component_name, c); let mut absolute_tasks = c .before + .to_vec() .iter() - .map(|name| self.get_absolute_task(name)) - .flatten() - .map(|jobs| jobs.to_vec()) - .flatten() + .flat_map(|name| self.get_absolute_task(name)) + .flat_map(|jobs| jobs.to_vec()) .collect::>(); let mut relative_tasks = c .before + .to_vec() .iter() - .map(|task_name| self.get_relative_task(task_name, &component_name)) - .flatten() - .map(|jobs| jobs.to_vec()) - .flatten() + .flat_map(|task_name| self.get_relative_task(task_name, component_name)) + .flat_map(|jobs| jobs.to_vec()) .collect::>(); absolute_tasks.append(&mut relative_tasks); @@ -102,39 +169,49 @@ impl Project { }) } + /// Retrieves a vector of jobs based on a group definition. + /// The return value is similar to `get_jobplan`, its is effectively a two-dimensional + /// array representing parallel and sequential jobs. fn get_group(&self, name: &str) -> Option> { - self.groups.iter().find(|g| g.name == name).map(|group| { + self.groups.get(name).map(|group| { group .components .iter() - .map(|name| self.get_by_name(&name)) - .flatten() + .flat_map(|name| self.get_by_name(name)) .collect() }) } + /// Converts a component definition to a job definition fn build_component_job(&self, name: &str, c: &Component) -> Job { let mut env = self.env.clone(); env.extend(c.env.clone()); Job { name: name.to_string(), + command: c.command.clone(), + color: c.color, + path: c.path.clone().unwrap_or_else(|| String::from(".")), env, ..Job::default() } } - fn build_project_task_job(&self, task: &TaskDefinition) -> Job { + /// Converts a root level task definition to a job definition + fn build_project_task_job(&self, name: &str, task: &TaskDefinition) -> Job { let mut env = self.env.clone(); env.extend(task.env.clone()); Job { - name: task.name.clone(), - ..Job::default() + env, + name: name.to_string(), + ..Job::from(task) } } + /// Converts a component level task definition to a job definition fn build_component_task_job( &self, - cmp_name: &str, + component_name: &str, + task_name: &str, component: &Component, task: &TaskDefinition, ) -> Job { @@ -142,77 +219,120 @@ impl Project { env.extend(component.env.clone()); env.extend(task.env.clone()); Job { - name: format!("{}:{}", cmp_name.to_string(), task.name.clone()), env, - ..Job::default() + name: format!("{}:{}", component_name, task_name), + ..Job::from(task) } } + /// Returns a list of jobs for an object by name. This name could be a task or component and + /// the jobs returned will be the matched item with all of its dependencies pub fn get_by_name(&self, name: &str) -> Option { self.get_component(name) .or_else(|| self.get_absolute_task(name)) - .or_else(|| { - self.get_group(name).map(|v| { - v.iter() - .map(|jobs| jobs.to_vec()) - .flatten() - .collect::>() - .into() - }) - }) } - pub fn from_str(s: &str) -> Self { - serde_yaml::from_str(s).unwrap() + /// Returns a vector of jobs for an object by name. This name could be a task, component, or + /// or group. The vector contains jobs that can be run in parallel. The individual jobs are run + /// in sequence. + pub fn get_runplan(&self, name: &str) -> Option> { + self.get_by_name(name) + .map(|i| vec![i]) + .or_else(|| self.get_group(name)) + } + + /// Load a project from a yaml string + pub fn load_str(s: &str) -> anyhow::Result { + Ok(serde_yaml::from_str(s)?) } } -#[derive(Serialize, Deserialize, Default)] +/// Groups are used to define a series of components that can be ran in parallel. +#[derive(Serialize, Deserialize, Default, Clone)] #[serde(default)] pub struct Group { - name: String, - components: Vec, + /// A list of component names to run when the group is launched + pub components: Vec, + /// A description for the group that is displayed in the task list + pub description: Option, } -#[derive(Serialize, Deserialize)] +/// A component represents a long running task. +/// In general, these are things that need to be kept alive during the +/// the operation. +#[derive(Serialize, Deserialize, Clone)] #[serde(default)] pub struct Component { + /// A description for the component that is displayed in the task list + pub description: Option, + /// A map of environment settings that are set before launching the component pub env: HashMap, - pub command: String, + /// The command(s) to execute when the component is launched + pub command: Command, + /// The working path the commands are executed in. This can be relative to the conductor.yaml pub path: Option, - pub retry: bool, + /// If true the component will be relaunched after it exits pub keep_alive: bool, + /// The number of seconds to wait before relaunching the component pub retry_delay: u64, - pub before: Vec, - pub tasks: Vec, + /// The tasks that should be ran before the component's commands + pub before: Dependencies, + /// A map of component level tasks. Tasks can be placed under the component for orgnaizational purposes. + /// They can be ran and referenced from anywhere using an absolute namespace: component::task + pub tasks: HashMap, + /// The color to use for the component name in the terminal output from the command. + pub color: (u8, u8, u8), } impl Default for Component { fn default() -> Self { + let mut rng = rand::thread_rng(); + Self { + description: None, env: HashMap::new(), - command: String::new(), + command: Command::default(), path: None, - retry: false, keep_alive: true, retry_delay: 2, - before: vec![], - tasks: vec![], + before: Dependencies::default(), + tasks: HashMap::new(), + color: ( + rng.gen_range(100..255), + rng.gen_range(100..255), + rng.gen_range(100..255), + ), } } } -#[derive(Serialize, Deserialize, Default)] +/// Tasks are short lived jobs that can be used to perform +/// utility tasks or to setup a component. +#[derive(Serialize, Deserialize, Default, Clone)] #[serde(default)] pub struct TaskDefinition { + pub description: String, + /// A map of environment variables that are provided before running the task pub env: HashMap, - pub name: String, - pub command: String, + /// The command(s) to execute when the task is launched + pub command: Command, + /// The path to execute the task commands from. This can be relative from the conductor.yaml pub path: Option, - pub retry: bool, - pub keep_alive: bool, - pub retry_delay: u64, - pub before: Vec, + /// Other tasks that should be ran before running the task's command + pub before: Dependencies, +} + +impl From<&TaskDefinition> for Job { + fn from(source: &TaskDefinition) -> Self { + Self { + env: source.env.clone(), + command: source.command.clone(), + retry: false, + keep_alive: false, + retry_delay: 0, + ..Default::default() + } + } } #[cfg(test)] @@ -221,12 +341,13 @@ mod tests { #[test] fn component_by_name() { - let project = Project::from_str( + let project = Project::load_str( r#" - components: - test-component: {} - "#, - ); + components: + test-component: {} + "#, + ) + .unwrap(); let jobs = project.get_by_name("test-component").unwrap(); assert_eq!(jobs.len(), 1); @@ -234,12 +355,14 @@ mod tests { #[test] fn project_task() { - let project = Project::from_str( + let project = Project::load_str( r#" - tasks: - - name: task1 - "#, - ); + tasks: + task1: + command: pwd + "#, + ) + .unwrap(); let jobs = project.get_by_name("task1").unwrap(); assert_eq!(jobs.len(), 1); @@ -247,14 +370,16 @@ mod tests { #[test] fn component_task() { - let project = Project::from_str( + let project = Project::load_str( r#" - components: - c1: - tasks: - - name: task1 - "#, - ); + components: + c1: + tasks: + task1: + command: pwd + "#, + ) + .unwrap(); let jobs = project.get_by_name("c1:task1").unwrap(); assert_eq!(jobs.len(), 1); @@ -262,16 +387,17 @@ mod tests { #[test] fn component_dependent_absolute_component_task() { - let project = Project::from_str( + let project = Project::load_str( r#" - components: - main-cmp: - before: - - main-cmp:sub-task - tasks: - - name: sub-task - "#, - ); + components: + main-cmp: + before: main-cmp:sub-task + tasks: + sub-task: + command: pwd + "#, + ) + .unwrap(); let jobs = project.get_by_name("main-cmp").unwrap(); assert_eq!(jobs.len(), 2); @@ -280,16 +406,18 @@ mod tests { #[test] fn component_dependent_relative() { - let project = Project::from_str( + let project = Project::load_str( r#" - components: - main-cmp: - before: - - sub-task - tasks: - - name: sub-task - "#, - ); + components: + main-cmp: + before: + - sub-task + tasks: + sub-task: + command: pwd + "#, + ) + .unwrap(); let jobs = project.get_by_name("main-cmp").unwrap(); assert_eq!(jobs.len(), 2); @@ -298,29 +426,33 @@ mod tests { #[test] fn complicated_dependencies() { - let project = Project::from_str( + let project = Project::load_str( r#" - components: - ui: - before: - - build-ui - tasks: - - name: build-ui - server: - before: - - setup - tasks: - - name: setup - - name: build - before: - - server:setup - tasks: - - name: build - before: - - ui:build-ui - - server:build - "#, - ); + components: + ui: + before: + - build-ui + tasks: + build-ui: + command: pwd + server: + before: + - setup + tasks: + setup: + command: pwd + build: + command: pwd + before: + - server:setup + tasks: + build: + before: + - ui:build-ui + - server:build + "#, + ) + .unwrap(); let jobs = project.get_by_name("build").unwrap(); assert_eq!(jobs.get(0).unwrap().name, "ui:build-ui"); @@ -332,17 +464,18 @@ mod tests { #[test] fn component_env() { - let project = Project::from_str( + let project = Project::load_str( r#" - env: - foo: one - sub: two - components: - main-cmp: - env: - sub: three - "#, - ); + env: + foo: one + sub: two + components: + main-cmp: + env: + sub: three + "#, + ) + .unwrap(); let jobs = project.get_by_name("main-cmp").unwrap(); let job = jobs.first().unwrap(); @@ -352,24 +485,25 @@ mod tests { #[test] fn task_env() { - let project = Project::from_str( + let project = Project::load_str( r#" - env: - root: ten - foo: one - sub: two - components: - main-cmp: - env: - sub: three - tasks: - - name: t - env: - foo: four - "#, - ); + env: + root: ten + foo: one + sub: two + components: + main-cmp: + env: + sub: three + tasks: + subtask: + env: + foo: four + "#, + ) + .unwrap(); - let jobs = project.get_by_name("main-cmp:t").unwrap(); + let jobs = project.get_by_name("main-cmp:subtask").unwrap(); let job = jobs.first().unwrap(); assert_eq!(job.env.get("foo"), Some(&String::from("four"))); assert_eq!(job.env.get("sub"), Some(&String::from("three"))); @@ -378,40 +512,76 @@ mod tests { #[test] fn group() { - let project = Project::from_str( + let project = Project::load_str( r#" - groups: - - name: all - components: - - ui - - server - components: - ui: - before: - - build-ui - tasks: - - name: build-ui - server: - before: - - setup - tasks: - - name: setup - - name: build - before: - - server:setup - tasks: - - name: build - before: - - ui:build-ui - - server:build - "#, - ); - let jobs = project.get_by_name("all").unwrap(); - println!("{:?}", jobs); - assert_eq!(jobs.get(0).unwrap().name, "ui:build-ui"); - assert_eq!(jobs.get(1).unwrap().name, "ui"); - assert_eq!(jobs.get(2).unwrap().name, "server:setup"); - assert_eq!(jobs.get(3).unwrap().name, "server"); - assert_eq!(jobs.len(), 4); + groups: + all: + components: + - ui + - server + components: + ui: + before: + - build-ui + tasks: + build-ui: + command: pwd + server: + before: + - setup + tasks: + setup: + command: pwd + build: + command: pwd + before: + - server:setup + tasks: + build: + before: + - ui:build-ui + - server:build + "#, + ) + .unwrap(); + let plan = project.get_runplan("all").unwrap(); + assert_eq!(plan[0][0].name, "ui:build-ui"); + assert_eq!(plan[0][1].name, "ui"); + assert_eq!(plan[1][0].name, "server:setup"); + assert_eq!(plan[1][1].name, "server"); + assert_eq!(plan.len(), 2); + assert_eq!(plan[0].len(), 2); + assert_eq!(plan[1].len(), 2); + } + + #[test] + pub fn shorthand_single_command() { + let project = Project::load_str( + r#" + components: + test: + command: pwd + "#, + ) + .unwrap(); + let jobs = project.get_by_name("test").unwrap(); + assert_eq!(jobs[0].command.to_string(), String::from("pwd")); + } + + #[test] + pub fn multiple_commands() { + let project = Project::load_str( + r#" + components: + test: + command: + - sleep 1 + - pwd + + "#, + ) + .unwrap(); + let jobs = project.get_by_name("test").unwrap(); + assert_eq!(jobs[0].command.to_string(), String::from("sleep 1 && pwd")); } } diff --git a/src/job.rs b/src/job.rs index a17d36d..8a3dd62 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1,10 +1,13 @@ use actix::Message; +use rand::Rng; use std::{ collections::HashMap, ops::{Deref, DerefMut}, }; -#[derive(Clone, Message)] +use crate::definition::Command; + +#[derive(Clone, Message, Debug)] #[rtype(result = "()")] pub struct Jobs(Vec); @@ -12,16 +15,13 @@ impl Jobs { pub fn new(tasks: Vec) -> Self { Self(tasks) } -} -impl std::fmt::Debug for Jobs { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Jobs") - .field( - "list", - &self.iter().map(|j| j.name.clone()).collect::>(), - ) - .finish() + pub fn pop_front(&mut self) -> Option { + if self.0.is_empty() { + None + } else { + Some(self.0.remove(0)) + } } } @@ -39,6 +39,15 @@ impl DerefMut for Jobs { } } +impl std::ops::Add for Jobs { + type Output = Jobs; + + fn add(mut self, mut rhs: Self) -> Self::Output { + self.0.append(&mut rhs.0); + self + } +} + impl From> for Jobs { fn from(fr: Vec) -> Self { Jobs::new(fr) @@ -49,24 +58,32 @@ impl From> for Jobs { #[rtype(result = "()")] pub struct Job { pub name: String, - pub command: String, + pub command: Command, pub path: String, pub env: HashMap, pub retry: bool, pub keep_alive: bool, pub retry_delay: u64, + pub color: (u8, u8, u8), } impl Default for Job { fn default() -> Self { + let mut rng = rand::thread_rng(); + Job { name: "Unnamed".to_string(), - command: "".to_string(), + command: "".into(), path: ".".to_string(), env: HashMap::new(), retry: true, keep_alive: true, retry_delay: 2, + color: ( + rng.gen_range(100..255), + rng.gen_range(100..255), + rng.gen_range(100..255), + ), } } } diff --git a/src/main.rs b/src/main.rs index f222c94..97255c8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,4 @@ use crate::definition::Project; -use actix::prelude::*; -use arkham::{App, Command}; use runner::Manager; use std::env; use std::path::{Path, PathBuf}; @@ -8,29 +6,48 @@ use std::path::{Path, PathBuf}; mod definition; mod job; mod runner; +mod term; #[actix_rt::main] async fn main() { - let project = Project::from_str( - r#" - components: - - name: ls - commands: - - ls - - name: currentdir - commands: - - pwd - "#, - ); - let jobs = project.get_by_name("ls").unwrap(); - let manager = Manager::new().start(); - manager.do_send(jobs); + if let Err(e) = run().await { + term::main_error(e); + } +} - find_config("conductor.yml"); +pub async fn run() -> anyhow::Result<()> { + let cfg_path = find_config("conductor.yml") + .ok_or_else(|| anyhow::anyhow!("No config file found. Create a conductor.yml.\nSee http://conductor.5sigma.io/articles/config"))?; + + std::env::set_current_dir(cfg_path.parent().unwrap())?; + + let config_str = std::fs::read_to_string(cfg_path)?; + let project = Project::load_str(&config_str)?; + + let args = std::env::args(); + + if args.len() == 1 { + term::help_text(&project); + return Ok(()); + } + + let plan = args.into_iter().skip(1).fold(vec![], |mut plan, arg| { + if let Some(mut j) = project.get_runplan(&arg) { + plan.append(&mut j) + } + plan + }); + + if plan.is_empty() { + return Err(anyhow::anyhow!("No tasks to run")); + } + + plan.into_iter().for_each(Manager::jobs); actix_rt::signal::ctrl_c() .await .expect("failed to listen for event"); + Ok(()) } fn find_config(config: &str) -> Option { diff --git a/src/runner.rs b/src/runner.rs index 0df7f0e..326e854 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -3,10 +3,12 @@ use std::collections::HashMap; use std::process::exit; use std::process::Stdio; use std::time::Duration; +use std::time::Instant; use actix::prelude::*; +use futures::AsyncReadExt; -use async_process::Child; +use super::term; use async_process::Command; use futures::io::AsyncBufReadExt; use futures::io::BufReader; @@ -14,10 +16,10 @@ use futures::io::BufReader; #[derive(Clone, Message, Debug)] #[rtype(result = "()")] pub enum Event { - StartedTask(i64, String), - Output(i64, String, String), - Error(i64, String, String), - FinishedTask(i64, String), + StartedTask(i64, Job), + Output(i64, Job, String), + Error(i64, Job, String), + FinishedTask(i64, Job, f64), Completed(i64), } @@ -26,8 +28,8 @@ pub struct Runner { pub id: i64, pub tasks: Jobs, pub manager: Recipient, - child: Option, pub current_job: Option, + pub current_job_start: Instant, } impl Runner { @@ -37,7 +39,7 @@ impl Runner { manager, id, current_job: None, - child: None, + current_job_start: Instant::now(), } } } @@ -46,7 +48,7 @@ impl Actor for Runner { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { - if let Some(task) = self.tasks.pop() { + if let Some(task) = self.tasks.pop_front() { ctx.notify(task); } else { ctx.stop(); @@ -58,13 +60,16 @@ impl Handler for Runner { type Result = (); fn handle(&mut self, job: Job, ctx: &mut Self::Context) -> Self::Result { self.current_job = Some(job.clone()); + self.current_job_start = Instant::now(); let _ = self .manager - .do_send(Event::StartedTask(self.id, job.name.clone())); + .do_send(Event::StartedTask(self.id, job.clone())); let mut child = Command::new("sh") .arg("-c") - .arg(&job.command) + .arg(&job.command.to_string()) + .envs(&job.env) + .current_dir(&job.path) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() @@ -74,7 +79,11 @@ impl Handler for Runner { .stdout .take() .expect("child did not have a handle to stdout"); - let reader = BufReader::new(stdout).lines(); + let stderr = child + .stderr + .take() + .expect("child did not have a handle to stdout"); + ctx.add_stream(BufReader::new(stdout.chain(stderr)).lines()); let fut = async move { child .status() @@ -84,40 +93,31 @@ impl Handler for Runner { let fut = actix::fut::wrap_future::<_, Self>(fut).map(move |status, _, ctx| { if job.keep_alive && status.code() == Some(0) { let delay = Duration::from_secs(job.retry_delay); - arkham::vox::header(format!("Restarting {} in {}s", job.name, delay.as_secs())); + term::header(&job.name, &format!("Restarting in {}s", delay.as_secs())); ctx.notify_later(job, delay); } }); ctx.spawn(fut); - ctx.add_stream(reader); } } impl StreamHandler> for Runner { fn handle(&mut self, item: Result, _: &mut Self::Context) { - let ev = match item { - Ok(v) => Event::Output( - self.id, - self.current_job - .as_ref() - .map(|i| i.name.clone()) - .unwrap_or_else(|| String::from("UNKNOWN")), - v, - ), - Err(e) => Event::Error( - self.id, - self.current_job.as_ref().unwrap().name.clone(), - e.to_string(), - ), - }; - let _ = self.manager.do_send(ev); + if let Some(ref job) = self.current_job { + let ev = match item { + Ok(v) => Event::Output(self.id, job.clone(), v), + Err(e) => Event::Error(self.id, job.clone(), e.to_string()), + }; + let _ = self.manager.do_send(ev); + } } fn finished(&mut self, ctx: &mut Self::Context) { let _ = self.manager.do_send(Event::FinishedTask( self.id, - self.current_job.as_ref().unwrap().name.clone(), + self.current_job.clone().unwrap_or_default(), + self.current_job_start.elapsed().as_secs_f64(), )); if let Some(task) = self.tasks.pop() { self.current_job = None; @@ -141,11 +141,14 @@ pub struct Manager { } impl Manager { - pub fn new() -> Self { - Self::default() + pub fn jobs(jobs: Jobs) { + Self::from_registry().do_send(jobs); } } +impl Supervised for Manager {} +impl SystemService for Manager {} + impl Actor for Manager { type Context = Context; } @@ -155,17 +158,17 @@ impl Handler for Manager { fn handle(&mut self, msg: Event, ctx: &mut Self::Context) -> Self::Result { match msg { - Event::StartedTask(_, name) => { - arkham::vox::header(format!("{} - Started", name)); + Event::StartedTask(_, job) => { + term::header(job.name, "Started"); } - Event::Output(_, name, v) => { - println!("[{}] {}", name, v) + Event::Output(_, job, v) => { + term::output(job.name, job.color, v); } - Event::Error(_, v, name) => { - println!("[{}] Error: {}", name, v); + Event::Error(_, job, v) => { + println!("[{}: {}]", job.name, v); } - Event::FinishedTask(_, name) => { - arkham::vox::header(format!("{} - Finished", name)); + Event::FinishedTask(_, job, time) => { + term::header(job.name, &format!("Finished ({:.2}s)", time)); } Event::Completed(id) => { self.runners.remove(&id); diff --git a/src/term.rs b/src/term.rs new file mode 100644 index 0000000..2eafe3d --- /dev/null +++ b/src/term.rs @@ -0,0 +1,84 @@ +use std::{collections::HashMap, fmt::Display}; + +use ansi_term::Color; + +use crate::definition::{Group, Project}; + +pub fn header>(name: T, msg: &str) { + let text = format!(" - --=[ {: <20} {: >20} ]=-- -", name.into(), msg); + println!("{}", Color::White.bold().paint(text)); +} + +pub fn main_error(error: T) { + println!("{}\n{}", Color::Red.bold().paint("Error:"), error); +} + +pub fn output(name: T, color: (u8, u8, u8), value: T) { + let n = Color::RGB(color.0, color.1, color.2).paint(format!("{}", name)); + println!("[{}] - {}", n, value); +} + +pub fn help_text(project: &Project) { + println!( + "{} {}", + Color::White.paint("Conductor"), + Color::White.dimmed().paint(env!("CARGO_PKG_VERSION")) + ); + item_list(project); +} + +pub fn item_list(project: &Project) { + println!("\n{}", ansi_term::Color::White.bold().paint("GROUPS")); + if project.groups.is_empty() { + println!("{}", Color::White.dimmed().paint("no groups defined")); + } else { + for (name, group) in sort_map(&project.groups).iter() { + print_group(name, group); + } + } + + println!("\n{}", ansi_term::Color::White.bold().paint("COMPONENTS")); + if project.components.is_empty() { + println!("{}", Color::White.dimmed().paint("no components defined")); + } else { + for (name, _component) in sort_map(&project.components).iter() { + println!("{}", name); + } + } + + println!("\n{}", ansi_term::Color::White.bold().paint("TASKS")); + if project.tasks.is_empty() && project.components.values().all(|c| c.tasks.is_empty()) { + println!("{}", Color::White.dimmed().paint("no tasks defined")); + } else { + for (name, _tasks) in sort_map(&project.tasks).iter() { + println!("{}", name); + } + for (c_name, component) in sort_map(&project.components).iter() { + for (t_name, _tasks) in sort_map(&component.tasks).iter() { + println!("{}:{}", c_name, t_name); + } + } + } +} + +fn sort_map(map: &HashMap) -> Vec<(String, V)> +where + V: Clone, +{ + let mut items: Vec<(String, V)> = map.iter().map(|v| (v.0.clone(), v.1.clone())).collect(); + items.sort_by_key(|i| i.0.clone()); + items +} + +fn print_group(name: &str, group: &Group) { + if let Some(ref desc) = group.description { + println!("{: <31}{}", name, desc); + } else { + println!( + "{: <31}{}: {}", + name, + Color::White.bold().paint("components"), + group.components.join(",") + ); + } +}