diff --git a/Cargo.lock b/Cargo.lock index 87cddb9..1589b24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -153,6 +153,7 @@ dependencies = [ "regex", "serde", "serde_json", + "serde_with", "serde_yaml", "sha2", "tempfile", diff --git a/attic/Cargo.toml b/attic/Cargo.toml index 5f4f888..e1e4926 100644 --- a/attic/Cargo.toml +++ b/attic/Cargo.toml @@ -17,6 +17,7 @@ nix-base32 = { git = "https://github.com/zhaofengli/nix-base32.git", rev = "b850 regex = "1.7.0" serde = { version = "1.0.151", features = ["derive"] } serde_yaml = "0.9.16" +serde_with = "2.1.0" sha2 = "0.10.6" tempfile = "3" wildmatch = "2.1.1" diff --git a/attic/src/api/v1/upload_path.rs b/attic/src/api/v1/upload_path.rs index 03c2667..10f7bcc 100644 --- a/attic/src/api/v1/upload_path.rs +++ b/attic/src/api/v1/upload_path.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DefaultOnError}; use crate::cache::CacheName; use crate::hash::Hash; @@ -50,3 +51,26 @@ pub struct UploadPathNarInfo { /// The size of the NAR. pub nar_size: usize, } + +#[serde_as] +#[derive(Debug, Serialize, Deserialize)] +pub struct UploadPathResult { + #[serde_as(deserialize_as = "DefaultOnError")] + pub kind: UploadPathResultKind, +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[non_exhaustive] +pub enum UploadPathResultKind { + /// The path was uploaded. + Uploaded, + + /// The path was globally deduplicated. + Deduplicated, +} + +impl Default for UploadPathResultKind { + fn default() -> Self { + Self::Uploaded + } +} diff --git a/client/src/api/mod.rs b/client/src/api/mod.rs index 65a9c49..279e3e5 100644 --- a/client/src/api/mod.rs +++ b/client/src/api/mod.rs @@ -16,7 +16,7 @@ use crate::config::ServerConfig; use crate::version::ATTIC_DISTRIBUTOR; use attic::api::v1::cache_config::{CacheConfig, CreateCacheRequest}; use attic::api::v1::get_missing_paths::{GetMissingPathsRequest, GetMissingPathsResponse}; -use attic::api::v1::upload_path::UploadPathNarInfo; +use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResult}; use attic::cache::CacheName; use attic::nix_store::StorePathHash; @@ -155,7 +155,11 @@ impl ApiClient { } /// Uploads a path. - pub async fn upload_path(&self, nar_info: UploadPathNarInfo, stream: S) -> Result<()> + pub async fn upload_path( + &self, + nar_info: UploadPathNarInfo, + stream: S, + ) -> Result> where S: TryStream + Send + Sync + 'static, S::Error: Into>, @@ -177,7 +181,10 @@ impl ApiClient { .await?; if res.status().is_success() { - Ok(()) + match res.json().await { + Ok(r) => Ok(Some(r)), + Err(_) => Ok(None), + } } else { let api_error = ApiError::try_from_response(res).await?; Err(api_error.into()) diff --git a/client/src/command/push.rs b/client/src/command/push.rs index ec84475..07e504c 100644 --- a/client/src/command/push.rs +++ b/client/src/command/push.rs @@ -17,7 +17,7 @@ use crate::api::ApiClient; use crate::cache::{CacheName, CacheRef}; use crate::cli::Opts; use crate::config::Config; -use attic::api::v1::upload_path::UploadPathNarInfo; +use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResultKind}; use attic::error::AtticResult; use attic::nix_store::{NixStore, StorePath, StorePathHash, ValidPathInfo}; @@ -131,19 +131,39 @@ pub async fn upload_path( let start = Instant::now(); match api.upload_path(upload_info, nar_stream).await { - Ok(_) => { - let elapsed = start.elapsed(); - let seconds = elapsed.as_secs_f64(); - let speed = (path_info.nar_size as f64 / seconds) as u64; + Ok(r) => { + if r.is_none() { + mp.suspend(|| { + eprintln!("Warning: Please update your server. Compatibility will be removed in the first stable release."); + }) + } + + let deduplicated = if let Some(r) = r { + r.kind == UploadPathResultKind::Deduplicated + } else { + false + }; + + if deduplicated { + mp.suspend(|| { + eprintln!("✅ {} (deduplicated)", path.as_os_str().to_string_lossy()); + }); + bar.finish_and_clear(); + } else { + let elapsed = start.elapsed(); + let seconds = elapsed.as_secs_f64(); + let speed = (path_info.nar_size as f64 / seconds) as u64; + + mp.suspend(|| { + eprintln!( + "✅ {} ({}/s)", + path.as_os_str().to_string_lossy(), + HumanBytes(speed) + ); + }); + bar.finish_and_clear(); + } - mp.suspend(|| { - eprintln!( - "✅ {} ({}/s)", - path.as_os_str().to_string_lossy(), - HumanBytes(speed) - ); - }); - bar.finish_and_clear(); Ok(()) } Err(e) => { diff --git a/server/src/api/v1/upload_path.rs b/server/src/api/v1/upload_path.rs index efa2ed8..b01a1e0 100644 --- a/server/src/api/v1/upload_path.rs +++ b/server/src/api/v1/upload_path.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use anyhow::anyhow; use async_compression::tokio::bufread::{BrotliEncoder, XzEncoder, ZstdEncoder}; use axum::{ - extract::{BodyStream, Extension}, + extract::{BodyStream, Extension, Json}, http::HeaderMap, }; use chrono::Utc; @@ -26,7 +26,7 @@ use crate::config::CompressionType; use crate::error::{ErrorKind, ServerError, ServerResult}; use crate::narinfo::Compression; use crate::{RequestState, State}; -use attic::api::v1::upload_path::UploadPathNarInfo; +use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResult, UploadPathResultKind}; use attic::hash::Hash; use attic::stream::StreamHasher; use attic::util::Finally; @@ -34,7 +34,7 @@ use attic::util::Finally; use crate::database::entity::cache; use crate::database::entity::nar::{self, Entity as Nar, NarState}; use crate::database::entity::object::{self, Entity as Object}; -use crate::database::entity::Json; +use crate::database::entity::Json as DbJson; use crate::database::{AtticDatabase, NarGuard}; type CompressorFn = Box Box + Send>; @@ -84,7 +84,7 @@ pub(crate) async fn upload_path( Extension(req_state): Extension, headers: HeaderMap, stream: BodyStream, -) -> ServerResult { +) -> ServerResult> { let upload_info: UploadPathNarInfo = { let header = headers .get("X-Attic-Nar-Info") @@ -114,7 +114,16 @@ pub(crate) async fn upload_path( match existing_nar { Some(existing_nar) => { // Deduplicate - upload_path_dedup(username, cache, upload_info, stream, database, &state, existing_nar).await + upload_path_dedup( + username, + cache, + upload_info, + stream, + database, + &state, + existing_nar, + ) + .await } None => { // New NAR @@ -132,7 +141,7 @@ async fn upload_path_dedup( database: &DatabaseConnection, state: &State, existing_nar: NarGuard, -) -> ServerResult { +) -> ServerResult> { if state.config.require_proof_of_possession { let (mut stream, nar_compute) = StreamHasher::new(stream, Sha256::new()); tokio::io::copy(&mut stream, &mut tokio::io::sink()) @@ -182,8 +191,9 @@ async fn upload_path_dedup( // Ensure it's not unlocked earlier drop(existing_nar); - // TODO - Ok("Success".to_string()) + Ok(Json(UploadPathResult { + kind: UploadPathResultKind::Deduplicated, + })) } /// Uploads a path when there is no matching NAR in the global cache. @@ -198,7 +208,7 @@ async fn upload_path_new( stream: impl AsyncRead + Send + Unpin + 'static, database: &DatabaseConnection, state: &State, -) -> ServerResult { +) -> ServerResult> { let compression_config = &state.config.compression; let compression: Compression = compression_config.r#type.into(); let level = compression_config.level(); @@ -228,7 +238,7 @@ async fn upload_path_new( nar_hash: Set(upload_info.nar_hash.to_typed_base16()), nar_size: Set(nar_size_db), - remote_file: Set(Json(remote_file)), + remote_file: Set(DbJson(remote_file)), remote_file_id: Set(remote_file_id), created_at: Set(Utc::now()), @@ -327,8 +337,9 @@ async fn upload_path_new( cleanup.cancel(); - // TODO - Ok("Success".to_string()) + Ok(Json(UploadPathResult { + kind: UploadPathResultKind::Uploaded, + })) } impl CompressionStream { @@ -379,9 +390,9 @@ impl UploadPathNarInfoExt for UploadPathNarInfo { object::ActiveModel { store_path_hash: Set(self.store_path_hash.to_string()), store_path: Set(self.store_path.clone()), - references: Set(Json(self.references.clone())), + references: Set(DbJson(self.references.clone())), deriver: Set(self.deriver.clone()), - sigs: Set(Json(self.sigs.clone())), + sigs: Set(DbJson(self.sigs.clone())), ca: Set(self.ca.clone()), ..Default::default() }