1
0
Fork 0
mirror of https://github.com/zhaofengli/attic.git synced 2024-12-14 11:57:30 +00:00

client/push: Use PushSession for --stdin

Instead of eagerly consuming stdin, read line-by-line and feed into
PushSession. This allows for a `nix-build | attic push` workflow.
This commit is contained in:
Zhaofeng Li 2024-10-04 09:16:22 -06:00
parent 99f3fbdc3d
commit c5764fca3b

View file

@ -1,17 +1,16 @@
use std::io;
use std::io::BufRead;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{anyhow, Result};
use clap::Parser;
use indicatif::MultiProgress;
use tokio::io::{self, AsyncBufReadExt, BufReader};
use crate::api::ApiClient;
use crate::cache::CacheRef;
use crate::cache::{CacheName, CacheRef, ServerName};
use crate::cli::Opts;
use crate::config::Config;
use crate::push::{PushConfig, Pusher};
use crate::push::{PushConfig, PushSessionConfig, Pusher};
use attic::nix_store::NixStore;
/// Push closures to a binary cache.
@ -47,6 +46,79 @@ pub struct Push {
force_preamble: bool,
}
struct PushContext {
store: Arc<NixStore>,
cache_name: CacheName,
server_name: ServerName,
pusher: Pusher,
no_closure: bool,
ignore_upstream_cache_filter: bool,
}
impl PushContext {
async fn push_static(self, paths: Vec<PathBuf>) -> Result<()> {
let roots = paths
.into_iter()
.map(|p| self.store.follow_store_path(p))
.collect::<std::result::Result<Vec<_>, _>>()?;
let plan = self
.pusher
.plan(roots, self.no_closure, self.ignore_upstream_cache_filter)
.await?;
if plan.store_path_map.is_empty() {
if plan.num_all_paths == 0 {
eprintln!("🤷 Nothing selected.");
} else {
eprintln!(
"✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)",
num_already_cached = plan.num_already_cached,
num_upstream = plan.num_upstream,
);
}
return Ok(());
} else {
eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...",
cache = self.cache_name.as_str(),
server = self.server_name.as_str(),
num_missing_paths = plan.store_path_map.len(),
num_already_cached = plan.num_already_cached,
num_upstream = plan.num_upstream,
);
}
for (_, path_info) in plan.store_path_map {
self.pusher.queue(path_info).await?;
}
let results = self.pusher.wait().await;
results.into_values().collect::<Result<Vec<()>>>()?;
Ok(())
}
async fn push_stdin(self) -> Result<()> {
let session = self.pusher.into_push_session(PushSessionConfig {
no_closure: self.no_closure,
ignore_upstream_cache_filter: self.ignore_upstream_cache_filter,
});
let stdin = BufReader::new(io::stdin());
let mut lines = stdin.lines();
while let Some(line) = lines.next_line().await? {
let path = self.store.follow_store_path(line)?;
session.queue_many(vec![path])?;
}
let results = session.wait().await?;
results.into_values().collect::<Result<Vec<()>>>()?;
Ok(())
}
}
pub async fn run(opts: Opts) -> Result<()> {
let sub = opts.command.as_push().unwrap();
if sub.jobs == 0 {
@ -56,27 +128,13 @@ pub async fn run(opts: Opts) -> Result<()> {
let config = Config::load()?;
let store = Arc::new(NixStore::connect()?);
let roots = if sub.stdin {
io::stdin()
.lock()
.lines()
.flatten()
.map(|p| store.follow_store_path(p.trim()))
.collect::<std::result::Result<Vec<_>, _>>()?
} else {
sub.paths
.clone()
.into_iter()
.map(|p| store.follow_store_path(p))
.collect::<std::result::Result<Vec<_>, _>>()?
};
let (server_name, server, cache) = config.resolve_cache(&sub.cache)?;
let (server_name, server, cache_name) = config.resolve_cache(&sub.cache)?;
let mut api = ApiClient::from_server_config(server.clone())?;
// Confirm remote cache validity, query cache config
let cache_config = api.get_cache_config(cache).await?;
let cache_config = api.get_cache_config(cache_name).await?;
if let Some(api_endpoint) = &cache_config.api_endpoint {
// Use delegated API endpoint
@ -90,39 +148,29 @@ pub async fn run(opts: Opts) -> Result<()> {
let mp = MultiProgress::new();
let pusher = Pusher::new(store, api, cache.to_owned(), cache_config, mp, push_config);
let plan = pusher
.plan(roots, sub.no_closure, sub.ignore_upstream_cache_filter)
.await?;
let pusher = Pusher::new(
store.clone(),
api,
cache_name.to_owned(),
cache_config,
mp,
push_config,
);
if plan.store_path_map.is_empty() {
if plan.num_all_paths == 0 {
eprintln!("🤷 Nothing selected.");
} else {
eprintln!(
"✅ All done! ({num_already_cached} already cached, {num_upstream} in upstream)",
num_already_cached = plan.num_already_cached,
num_upstream = plan.num_upstream,
);
}
let push_ctx = PushContext {
store,
cache_name: cache_name.clone(),
server_name: server_name.clone(),
pusher,
no_closure: sub.no_closure,
ignore_upstream_cache_filter: sub.ignore_upstream_cache_filter,
};
return Ok(());
if sub.stdin {
push_ctx.push_stdin().await?;
} else {
eprintln!("⚙️ Pushing {num_missing_paths} paths to \"{cache}\" on \"{server}\" ({num_already_cached} already cached, {num_upstream} in upstream)...",
cache = cache.as_str(),
server = server_name.as_str(),
num_missing_paths = plan.store_path_map.len(),
num_already_cached = plan.num_already_cached,
num_upstream = plan.num_upstream,
);
push_ctx.push_static(sub.paths.clone()).await?;
}
for (_, path_info) in plan.store_path_map {
pusher.queue(path_info).await?;
}
let results = pusher.wait().await;
results.into_values().collect::<Result<Vec<()>>>()?;
Ok(())
}