From 8f1916b5cc82a8e6eeb7966fbf8651a7244bd74d Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Sun, 29 Jan 2023 12:01:54 -0700 Subject: [PATCH] client: Refactor pushing to use a job queue --- Cargo.lock | 21 +++ client/Cargo.toml | 1 + client/src/command/push.rs | 211 ++------------------------- client/src/main.rs | 1 + client/src/push.rs | 285 +++++++++++++++++++++++++++++++++++++ 5 files changed, 322 insertions(+), 197 deletions(-) create mode 100644 client/src/push.rs diff --git a/Cargo.lock b/Cargo.lock index ca404bc..dce199b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,17 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" +[[package]] +name = "async-channel" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-compression" version = "0.3.15" @@ -170,6 +181,7 @@ name = "attic-client" version = "0.1.0" dependencies = [ "anyhow", + "async-channel", "attic", "bytes", "clap 4.0.32", @@ -1033,6 +1045,15 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "concurrent-queue" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c278839b831783b70278b14df4d45e1beb1aad306c07bb796637de9a0e323e8e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.3" diff --git a/client/Cargo.toml b/client/Cargo.toml index 62ede81..9eabfe0 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -12,6 +12,7 @@ path = "src/main.rs" attic = { path = "../attic" } anyhow = "1.0.68" +async-channel = "1.8.0" bytes = "1.3.0" clap = { version = "4.0", features = ["derive"] } clap_complete = "4.0.2" diff --git a/client/src/command/push.rs b/client/src/command/push.rs index 0fd5a4d..8b6e7a0 100644 --- a/client/src/command/push.rs +++ b/client/src/command/push.rs @@ -1,25 +1,18 @@ use std::collections::{HashMap, HashSet}; -use std::fmt::Write; +use std::cmp; use std::path::PathBuf; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; use anyhow::{anyhow, Result}; -use bytes::Bytes; use clap::Parser; use futures::future::join_all; -use futures::stream::{Stream, TryStreamExt}; -use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle}; -use tokio::sync::Semaphore; +use indicatif::MultiProgress; use crate::api::ApiClient; use crate::cache::{CacheName, CacheRef}; use crate::cli::Opts; use crate::config::Config; -use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResult, UploadPathResultKind}; -use attic::error::AtticResult; +use crate::push::{Pusher, PushConfig}; use attic::nix_store::{NixStore, StorePath, StorePathHash, ValidPathInfo}; /// Push closures to a binary cache. @@ -62,132 +55,6 @@ struct PushPlan { num_upstream: usize, } -/// Wrapper to update a progress bar as a NAR is streamed. -struct NarStreamProgress { - stream: S, - bar: ProgressBar, -} - -/// Uploads a single path to a cache. -pub async fn upload_path( - store: Arc, - path_info: ValidPathInfo, - api: ApiClient, - cache: &CacheName, - mp: MultiProgress, - force_preamble: bool, -) -> Result<()> { - let path = &path_info.path; - let upload_info = { - let full_path = store - .get_full_path(path) - .to_str() - .ok_or_else(|| anyhow!("Path contains non-UTF-8"))? - .to_string(); - - let references = path_info - .references - .into_iter() - .map(|pb| { - pb.to_str() - .ok_or_else(|| anyhow!("Reference contains non-UTF-8")) - .map(|s| s.to_owned()) - }) - .collect::, anyhow::Error>>()?; - - UploadPathNarInfo { - cache: cache.to_owned(), - store_path_hash: path.to_hash(), - store_path: full_path, - references, - system: None, // TODO - deriver: None, // TODO - sigs: path_info.sigs, - ca: path_info.ca, - nar_hash: path_info.nar_hash.to_owned(), - nar_size: path_info.nar_size as usize, - } - }; - - let template = format!( - "{{spinner}} {: <20.20} {{bar:40.green/blue}} {{human_bytes:10}} ({{average_speed}})", - path.name(), - ); - let style = ProgressStyle::with_template(&template) - .unwrap() - .tick_chars("🕛🕐🕑🕒🕓🕔🕕🕖🕗🕘🕙🕚✅") - .progress_chars("██ ") - .with_key("human_bytes", |state: &ProgressState, w: &mut dyn Write| { - write!(w, "{}", HumanBytes(state.pos())).unwrap(); - }) - // Adapted from - // - .with_key( - "average_speed", - |state: &ProgressState, w: &mut dyn Write| match (state.pos(), state.elapsed()) { - (pos, elapsed) if elapsed > Duration::ZERO => { - write!(w, "{}", average_speed(pos, elapsed)).unwrap(); - } - _ => write!(w, "-").unwrap(), - }, - ); - let bar = mp.add(ProgressBar::new(path_info.nar_size)); - bar.set_style(style); - let nar_stream = NarStreamProgress::new(store.nar_from_path(path.to_owned()), bar.clone()) - .map_ok(Bytes::from); - - let start = Instant::now(); - match api - .upload_path(upload_info, nar_stream, force_preamble) - .await - { - Ok(r) => { - let r = r.unwrap_or(UploadPathResult { - kind: UploadPathResultKind::Uploaded, - file_size: None, - frac_deduplicated: None, - }); - - let info_string: String = match r.kind { - UploadPathResultKind::Deduplicated => "deduplicated".to_string(), - _ => { - let elapsed = start.elapsed(); - let seconds = elapsed.as_secs_f64(); - let speed = (path_info.nar_size as f64 / seconds) as u64; - - let mut s = format!("{}/s", HumanBytes(speed)); - - if let Some(frac_deduplicated) = r.frac_deduplicated { - if frac_deduplicated > 0.01f64 { - s += &format!(", {:.1}% deduplicated", frac_deduplicated * 100.0); - } - } - - s - } - }; - - mp.suspend(|| { - eprintln!( - "✅ {} ({})", - path.as_os_str().to_string_lossy(), - info_string - ); - }); - bar.finish_and_clear(); - - Ok(()) - } - Err(e) => { - mp.suspend(|| { - eprintln!("❌ {}: {}", path.as_os_str().to_string_lossy(), e); - }); - bar.finish_and_clear(); - Err(e) - } - } -} - pub async fn run(opts: Opts) -> Result<()> { let sub = opts.command.as_push().unwrap(); if sub.jobs == 0 { @@ -239,40 +106,20 @@ pub async fn run(opts: Opts) -> Result<()> { ); } + let push_config = PushConfig { + num_workers: cmp::min(sub.jobs, plan.store_path_map.len()), + force_preamble: sub.force_preamble, + }; + let mp = MultiProgress::new(); - let upload_limit = Arc::new(Semaphore::new(sub.jobs)); - let futures = plan - .store_path_map - .into_iter() - .map(|(_, path_info)| { - let store = store.clone(); - let api = api.clone(); - let mp = mp.clone(); - let upload_limit = upload_limit.clone(); - async move { - let permit = upload_limit.acquire().await?; + let pusher = Pusher::new(store, api, cache.to_owned(), mp, push_config); + for (_, path_info) in plan.store_path_map { + pusher.push(path_info).await?; + } - upload_path( - store.clone(), - path_info, - api, - cache, - mp.clone(), - sub.force_preamble, - ) - .await?; - - drop(permit); - Ok::<(), anyhow::Error>(()) - } - }) - .collect::>(); - - futures::future::join_all(futures) - .await - .into_iter() - .collect::>>()?; + let results = pusher.wait().await; + results.into_iter().map(|(_, result)| result).collect::>>()?; Ok(()) } @@ -376,33 +223,3 @@ impl PushPlan { }) } } - -impl>>> NarStreamProgress { - fn new(stream: S, bar: ProgressBar) -> Self { - Self { stream, bar } - } -} - -impl>> + Unpin> Stream for NarStreamProgress { - type Item = AtticResult>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.stream).as_mut().poll_next(cx) { - Poll::Ready(Some(data)) => { - if let Ok(data) = &data { - self.bar.inc(data.len() as u64); - } - - Poll::Ready(Some(data)) - } - other => other, - } - } -} - -// Just the average, no fancy sliding windows that cause wild fluctuations -// -fn average_speed(bytes: u64, duration: Duration) -> String { - let speed = bytes as f64 * 1000_f64 / duration.as_millis() as f64; - format!("{}/s", HumanBytes(speed as u64)) -} diff --git a/client/src/main.rs b/client/src/main.rs index 1eb4765..ca85460 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -20,6 +20,7 @@ mod command; mod config; mod nix_config; mod nix_netrc; +mod push; mod version; use anyhow::Result; diff --git a/client/src/push.rs b/client/src/push.rs new file mode 100644 index 0000000..75cd007 --- /dev/null +++ b/client/src/push.rs @@ -0,0 +1,285 @@ +//! Store path uploader. +//! +//! Multiple workers are spawned to upload store paths concurrently. +//! +//! TODO: Refactor out progress reporting and support a simple output style without progress bars + +use std::collections::HashMap; +use std::fmt::Write; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +use anyhow::{anyhow, Result}; +use async_channel as channel; +use bytes::Bytes; +use futures::stream::{Stream, TryStreamExt}; +use futures::future::join_all; +use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle}; +use tokio::task::{JoinHandle, spawn}; + +use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResult, UploadPathResultKind}; +use attic::cache::CacheName; +use attic::error::AtticResult; +use attic::nix_store::{NixStore, StorePath, ValidPathInfo}; +use crate::api::ApiClient; + +type JobSender = channel::Sender; +type JobReceiver = channel::Receiver; + +/// Configuration for pushing store paths. +#[derive(Clone, Copy, Debug)] +pub struct PushConfig { + /// The number of workers to spawn. + pub num_workers: usize, + + /// Whether to always include the upload info in the PUT payload. + pub force_preamble: bool, +} + +/// A handle to push store paths to a cache. +/// +/// The caller is responsible for computing closures and +/// checking for paths that already exist on the remote +/// cache. +pub struct Pusher { + workers: Vec>>>, + sender: JobSender, +} + +/// Wrapper to update a progress bar as a NAR is streamed. +struct NarStreamProgress { + stream: S, + bar: ProgressBar, +} + +impl Pusher { + pub fn new(store: Arc, api: ApiClient, cache: CacheName, mp: MultiProgress, config: PushConfig) -> Self { + let (sender, receiver) = channel::unbounded(); + let mut workers = Vec::new(); + + for _ in 0..config.num_workers { + workers.push(spawn(worker( + receiver.clone(), + store.clone(), + api.clone(), + cache.clone(), + mp.clone(), + config.clone(), + ))); + } + + Self { workers, sender } + } + + /// Sends a path to be pushed. + pub async fn push(&self, path_info: ValidPathInfo) -> Result<()> { + self.sender.send(path_info).await + .map_err(|e| anyhow!(e)) + } + + /// Waits for all workers to terminate, returning all results. + /// + /// TODO: Stream the results with another channel + pub async fn wait(self) -> HashMap> { + drop(self.sender); + + let results = join_all(self.workers) + .await + .into_iter() + .map(|joinresult| joinresult.unwrap()) + .fold(HashMap::new(), |mut acc, results| { + acc.extend(results); + acc + }); + + results + } +} + +async fn worker( + receiver: JobReceiver, + store: Arc, + api: ApiClient, + cache: CacheName, + mp: MultiProgress, + config: PushConfig, +) -> HashMap> { + let mut results = HashMap::new(); + + loop { + let path_info = match receiver.recv().await { + Ok(path_info) => path_info, + Err(_) => { + // channel is closed - we are done + break; + } + }; + + let store_path = path_info.path.clone(); + + let r = upload_path( + path_info, + store.clone(), + api.clone(), + &cache, + mp.clone(), + config.force_preamble, + ).await; + + results.insert(store_path, r); + } + + results +} + +/// Uploads a single path to a cache. +pub async fn upload_path( + path_info: ValidPathInfo, + store: Arc, + api: ApiClient, + cache: &CacheName, + mp: MultiProgress, + force_preamble: bool, +) -> Result<()> { + let path = &path_info.path; + let upload_info = { + let full_path = store + .get_full_path(path) + .to_str() + .ok_or_else(|| anyhow!("Path contains non-UTF-8"))? + .to_string(); + + let references = path_info + .references + .into_iter() + .map(|pb| { + pb.to_str() + .ok_or_else(|| anyhow!("Reference contains non-UTF-8")) + .map(|s| s.to_owned()) + }) + .collect::, anyhow::Error>>()?; + + UploadPathNarInfo { + cache: cache.to_owned(), + store_path_hash: path.to_hash(), + store_path: full_path, + references, + system: None, // TODO + deriver: None, // TODO + sigs: path_info.sigs, + ca: path_info.ca, + nar_hash: path_info.nar_hash.to_owned(), + nar_size: path_info.nar_size as usize, + } + }; + + let template = format!( + "{{spinner}} {: <20.20} {{bar:40.green/blue}} {{human_bytes:10}} ({{average_speed}})", + path.name(), + ); + let style = ProgressStyle::with_template(&template) + .unwrap() + .tick_chars("🕛🕐🕑🕒🕓🕔🕕🕖🕗🕘🕙🕚✅") + .progress_chars("██ ") + .with_key("human_bytes", |state: &ProgressState, w: &mut dyn Write| { + write!(w, "{}", HumanBytes(state.pos())).unwrap(); + }) + // Adapted from + // + .with_key( + "average_speed", + |state: &ProgressState, w: &mut dyn Write| match (state.pos(), state.elapsed()) { + (pos, elapsed) if elapsed > Duration::ZERO => { + write!(w, "{}", average_speed(pos, elapsed)).unwrap(); + } + _ => write!(w, "-").unwrap(), + }, + ); + let bar = mp.add(ProgressBar::new(path_info.nar_size)); + bar.set_style(style); + let nar_stream = NarStreamProgress::new(store.nar_from_path(path.to_owned()), bar.clone()) + .map_ok(Bytes::from); + + let start = Instant::now(); + match api + .upload_path(upload_info, nar_stream, force_preamble) + .await + { + Ok(r) => { + let r = r.unwrap_or(UploadPathResult { + kind: UploadPathResultKind::Uploaded, + file_size: None, + frac_deduplicated: None, + }); + + let info_string: String = match r.kind { + UploadPathResultKind::Deduplicated => "deduplicated".to_string(), + _ => { + let elapsed = start.elapsed(); + let seconds = elapsed.as_secs_f64(); + let speed = (path_info.nar_size as f64 / seconds) as u64; + + let mut s = format!("{}/s", HumanBytes(speed)); + + if let Some(frac_deduplicated) = r.frac_deduplicated { + if frac_deduplicated > 0.01f64 { + s += &format!(", {:.1}% deduplicated", frac_deduplicated * 100.0); + } + } + + s + } + }; + + mp.suspend(|| { + eprintln!( + "✅ {} ({})", + path.as_os_str().to_string_lossy(), + info_string + ); + }); + bar.finish_and_clear(); + + Ok(()) + } + Err(e) => { + mp.suspend(|| { + eprintln!("❌ {}: {}", path.as_os_str().to_string_lossy(), e); + }); + bar.finish_and_clear(); + Err(e) + } + } +} + +impl>>> NarStreamProgress { + fn new(stream: S, bar: ProgressBar) -> Self { + Self { stream, bar } + } +} + +impl>> + Unpin> Stream for NarStreamProgress { + type Item = AtticResult>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.stream).as_mut().poll_next(cx) { + Poll::Ready(Some(data)) => { + if let Ok(data) = &data { + self.bar.inc(data.len() as u64); + } + + Poll::Ready(Some(data)) + } + other => other, + } + } +} + +// Just the average, no fancy sliding windows that cause wild fluctuations +// +fn average_speed(bytes: u64, duration: Duration) -> String { + let speed = bytes as f64 * 1000_f64 / duration.as_millis() as f64; + format!("{}/s", HumanBytes(speed as u64)) +}