diff --git a/Cargo.lock b/Cargo.lock index 80649de..a206ea4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,8 +137,10 @@ dependencies = [ name = "attic" version = "0.1.0" dependencies = [ + "async-stream", "base64 0.20.0", "bindgen", + "bytes", "cxx", "cxx-build", "digest", @@ -197,6 +199,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-compression", + "async-stream", "async-trait", "attic", "attic-token", @@ -213,6 +216,7 @@ dependencies = [ "digest", "displaydoc", "enum-as-inner", + "fastcdc", "futures", "hex", "humantime", @@ -229,6 +233,7 @@ dependencies = [ "serde_with", "sha2", "tokio", + "tokio-test", "tokio-util", "toml", "tower-http", @@ -1483,6 +1488,12 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "fastcdc" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3f1596230ad22715a97a82deba0403ece9e4a458d008fd2511518d72a115bef" + [[package]] name = "fastrand" version = "1.8.0" diff --git a/attic/Cargo.toml b/attic/Cargo.toml index 3ef5ee6..d120cba 100644 --- a/attic/Cargo.toml +++ b/attic/Cargo.toml @@ -5,7 +5,9 @@ edition = "2021" publish = false [dependencies] +async-stream = { version = "0.3.3", optional = true } base64 = "0.20.0" +bytes = "1.3.0" displaydoc = "0.2.3" digest = "0.10.6" ed25519-compact = "2.0.4" @@ -56,4 +58,4 @@ nix_store = [ "dep:cxx", "dep:bindgen", "dep:cxx-build" ] # Tokio. # # When disabled, any part depending on tokio is unavailable. -tokio = [ "dep:tokio" ] +tokio = [ "dep:tokio", "dep:async-stream" ] diff --git a/attic/src/stream.rs b/attic/src/stream.rs index 8d71c43..21908fa 100644 --- a/attic/src/stream.rs +++ b/attic/src/stream.rs @@ -1,13 +1,19 @@ //! Stream utilities. +use std::collections::VecDeque; +use std::future::Future; use std::marker::Unpin; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use async_stream::try_stream; +use bytes::Bytes; use digest::{Digest, Output as DigestOutput}; +use futures::stream::{BoxStream, Stream, StreamExt}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio::sync::OnceCell; +use tokio::task::spawn; /// Stream filter that hashes the bytes that have been read. /// @@ -19,6 +25,79 @@ pub struct StreamHasher { finalized: Arc, usize)>>, } +/// Merge chunks lazily into a continuous stream. +/// +/// For each chunk, a function is called to transform it into a +/// `Stream>`. This function does something like +/// opening the local file or sending a request to S3. +/// +/// We call this function some time before the start of the chunk +/// is reached to eliminate delays between chunks so the merged +/// stream is smooth. We don't want to start streaming all chunks +/// at once as it's a waste of resources. +/// +/// ```text +/// | S3 GET | Chunk | S3 GET | ... | S3 GET | Chunk +/// ``` +/// +/// ```text +/// | S3 GET | Chunk | Chunk | Chunk | Chunk +/// | S3 GET |-----------^ ^ ^ +/// | S3 GET |------| | +/// | S3 GET |--------------| +/// +/// ``` +/// +/// TODO: Support range requests so we can have seekable NARs. +pub fn merge_chunks( + mut chunks: VecDeque, + streamer: F, + streamer_arg: S, + num_prefetch: usize, +) -> Pin>>> +where + F: Fn(C, S) -> Fut, + S: Clone, + Fut: Future>, E>> + Send + 'static, + E: Send + 'static, +{ + let s = try_stream! { + let mut streams = VecDeque::new(); // a queue of JoinHandles + + // otherwise type inference gets confused :/ + if false { + let chunk = chunks.pop_front().unwrap(); + let stream = spawn(streamer(chunk, streamer_arg.clone())); + streams.push_back(stream); + } + + loop { + if let Some(stream) = streams.pop_front() { + let mut stream = stream.await.unwrap()?; + while let Some(item) = stream.next().await { + let item = item?; + yield item; + } + } + + while streams.len() < num_prefetch { + if let Some(chunk) = chunks.pop_front() { + let stream = spawn(streamer(chunk, streamer_arg.clone())); + streams.push_back(stream); + } else { + break; + } + } + + if chunks.is_empty() && streams.is_empty() { + // we are done! + break; + } + } + }; + Box::pin(s) +} + impl StreamHasher { pub fn new(inner: R, digest: D) -> (Self, Arc, usize)>>) { let finalized = Arc::new(OnceCell::new()); @@ -105,6 +184,9 @@ pub async fn read_chunk_async( mod tests { use super::*; + use async_stream::stream; + use bytes::{BufMut, BytesMut}; + use futures::future; use tokio::io::AsyncReadExt; use tokio_test::block_on; @@ -135,4 +217,45 @@ mod tests { assert_eq!(expected.len(), *count); eprintln!("finalized = {:x?}", finalized); } + + #[test] + fn test_merge_chunks() { + let chunk_a: BoxStream> = { + let s = stream! { + yield Ok(Bytes::from_static(b"Hello")); + }; + Box::pin(s) + }; + + let chunk_b: BoxStream> = { + let s = stream! { + yield Ok(Bytes::from_static(b", ")); + yield Ok(Bytes::from_static(b"world")); + }; + Box::pin(s) + }; + + let chunk_c: BoxStream> = { + let s = stream! { + yield Ok(Bytes::from_static(b"!")); + }; + Box::pin(s) + }; + + let chunks: VecDeque>> = + [chunk_a, chunk_b, chunk_c].into_iter().collect(); + + let streamer = |c, _| future::ok(c); + let mut merged = merge_chunks(chunks, streamer, (), 2); + + let bytes = block_on(async move { + let mut bytes = BytesMut::with_capacity(100); + while let Some(item) = merged.next().await { + bytes.put(item.unwrap()); + } + bytes.freeze() + }); + + assert_eq!(&*bytes, b"Hello, world!"); + } } diff --git a/integration-tests/basic/default.nix b/integration-tests/basic/default.nix index 7ab9476..8b86f48 100644 --- a/integration-tests/basic/default.nix +++ b/integration-tests/basic/default.nix @@ -126,6 +126,13 @@ in { credentialsFile = "/etc/atticd.env"; settings = { listen = "[::]:8080"; + + chunking = { + nar-size-threshold = 1; + min-size = 64 * 1024; + avg-size = 128 * 1024; + max-size = 256 * 1024; + }; }; }; @@ -196,6 +203,13 @@ in { server.succeed("${cmd.atticd} --mode garbage-collector-once") client.fail(f"curl -sL --fail-with-body http://server:8080/test/{test_file_hash}.narinfo") + ${lib.optionalString (config.storage == "local") '' + with subtest("Check that all chunks are actually deleted after GC"): + files = server.succeed("find /var/lib/atticd/storage -type f") + print(f"Remaining files: {files}") + assert files.strip() == "" + ''} + with subtest("Check that we can destroy the cache"): client.succeed("attic cache info test") client.succeed("attic cache destroy --no-confirm test") diff --git a/server/Cargo.toml b/server/Cargo.toml index 873cc39..8623091 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -23,6 +23,7 @@ attic = { path = "../attic", default-features = false, features = [ "tokio" ] } attic-token = { path = "../token" } anyhow = "1.0.68" +async-stream = "0.3.3" async-trait = "0.1.60" aws-config = "0.52.0" aws-sdk-s3 = "0.22.0" @@ -36,6 +37,7 @@ derivative = "2.2.0" digest = "0.10.6" displaydoc = "0.2.3" enum-as-inner = "0.5.1" +fastcdc = "1.0.7" futures = "0.3.25" hex = "0.4.3" humantime = "2.1.0" @@ -92,3 +94,6 @@ features = [ "rt-multi-thread", "sync", ] + +[dev-dependencies] +tokio-test = "0.4.2" diff --git a/server/src/api/binary_cache.rs b/server/src/api/binary_cache.rs index ec4bd0a..90b7afd 100644 --- a/server/src/api/binary_cache.rs +++ b/server/src/api/binary_cache.rs @@ -4,7 +4,10 @@ //! //! The implementation is based on the specifications at . +use std::collections::VecDeque; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::path::PathBuf; +use std::sync::Arc; use axum::{ body::StreamBody, @@ -14,19 +17,22 @@ use axum::{ routing::get, Router, }; +use futures::stream::BoxStream; use serde::Serialize; use tokio_util::io::ReaderStream; use tracing::instrument; +use crate::database::entity::chunk::ChunkModel; use crate::database::AtticDatabase; use crate::error::{ErrorKind, ServerResult}; use crate::narinfo::NarInfo; use crate::nix_manifest; -use crate::storage::Download; +use crate::storage::{Download, StorageBackend}; use crate::{RequestState, State}; use attic::cache::CacheName; use attic::mime; use attic::nix_store::StorePathHash; +use attic::stream::merge_chunks; /// Nix cache information. /// @@ -128,10 +134,10 @@ async fn get_store_path_info( cache_name ); - let (object, cache, nar) = state + let (object, cache, nar, chunks) = state .database() .await? - .find_object_by_store_path_hash(&cache_name, &store_path_hash) + .find_object_and_chunks_by_store_path_hash(&cache_name, &store_path_hash) .await?; let permission = req_state @@ -141,6 +147,11 @@ async fn get_store_path_info( req_state.set_public_cache(cache.is_public); + if chunks.iter().any(Option::is_none) { + // at least one of the chunks is missing :( + return Err(ErrorKind::IncompleteNar.into()); + } + let mut narinfo = object.to_nar_info(&nar)?; if narinfo.signature().is_none() { @@ -184,8 +195,8 @@ async fn get_nar( let database = state.database().await?; - let (object, cache, nar) = database - .find_object_by_store_path_hash(&cache_name, &store_path_hash) + let (object, cache, _nar, chunks) = database + .find_object_and_chunks_by_store_path_hash(&cache_name, &store_path_hash) .await?; let permission = req_state @@ -195,18 +206,62 @@ async fn get_nar( req_state.set_public_cache(cache.is_public); + if chunks.iter().any(Option::is_none) { + // at least one of the chunks is missing :( + return Err(ErrorKind::IncompleteNar.into()); + } + database.bump_object_last_accessed(object.id).await?; - let remote_file = nar.remote_file.0; - let backend = state.storage().await?; - match backend.download_file_db(&remote_file).await? { - Download::Redirect(uri) => Ok(Redirect::temporary(&uri).into_response()), - Download::Stream(stream) => { - let stream = ReaderStream::new(stream); - let body = StreamBody::new(stream); - - Ok(body.into_response()) + if chunks.len() == 1 { + // single chunk + let chunk = chunks[0].as_ref().unwrap(); + let remote_file = &chunk.remote_file.0; + let storage = state.storage().await?; + match storage.download_file_db(remote_file, false).await? { + Download::Url(url) => Ok(Redirect::temporary(&url).into_response()), + Download::Stream(stream) => { + let body = StreamBody::new(stream); + Ok(body.into_response()) + } + Download::AsyncRead(stream) => { + let stream = ReaderStream::new(stream); + let body = StreamBody::new(stream); + Ok(body.into_response()) + } } + } else { + // reassemble NAR + fn io_error(e: E) -> IoError { + IoError::new(IoErrorKind::Other, e) + } + + let streamer = |chunk: ChunkModel, storage: Arc>| async move { + match storage + .download_file_db(&chunk.remote_file.0, true) + .await + .map_err(io_error)? + { + Download::Url(_) => Err(IoError::new( + IoErrorKind::Other, + "URLs not supported for NAR reassembly", + )), + Download::Stream(stream) => Ok(stream), + Download::AsyncRead(stream) => { + let stream: BoxStream<_> = Box::pin(ReaderStream::new(stream)); + Ok(stream) + } + } + }; + + let chunks: VecDeque<_> = chunks.into_iter().map(Option::unwrap).collect(); + let storage = state.storage().await?.clone(); + + // TODO: Make num_prefetch configurable + // The ideal size depends on the average chunk size + let merged = merge_chunks(chunks, streamer, storage, 2); + let body = StreamBody::new(merged); + Ok(body.into_response()) } } diff --git a/server/src/api/v1/get_missing_paths.rs b/server/src/api/v1/get_missing_paths.rs index ce8e427..23b4530 100644 --- a/server/src/api/v1/get_missing_paths.rs +++ b/server/src/api/v1/get_missing_paths.rs @@ -6,6 +6,7 @@ use sea_orm::{FromQueryResult, QuerySelect}; use tracing::instrument; use crate::database::entity::cache; +use crate::database::entity::nar; use crate::database::entity::object::{self, Entity as Object}; use crate::error::{ServerError, ServerResult}; use crate::{RequestState, State}; @@ -48,8 +49,10 @@ pub(crate) async fn get_missing_paths( .select_only() .column_as(object::Column::StorePathHash, "store_path_hash") .join(sea_orm::JoinType::InnerJoin, object::Relation::Cache.def()) + .join(sea_orm::JoinType::InnerJoin, object::Relation::Nar.def()) .filter(cache::Column::Name.eq(payload.cache.as_str())) .filter(object::Column::StorePathHash.is_in(query_in)) + .filter(nar::Column::CompletenessHint.eq(true)) .into_model::() .all(database) .await diff --git a/server/src/api/v1/upload_path.rs b/server/src/api/v1/upload_path.rs index ac284bd..f3e1aa0 100644 --- a/server/src/api/v1/upload_path.rs +++ b/server/src/api/v1/upload_path.rs @@ -1,23 +1,29 @@ use std::io; +use std::io::Cursor; use std::marker::Unpin; use std::sync::Arc; use anyhow::anyhow; use async_compression::tokio::bufread::{BrotliEncoder, XzEncoder, ZstdEncoder}; +use async_compression::Level as CompressionLevel; use axum::{ extract::{BodyStream, Extension, Json}, http::HeaderMap, }; +use bytes::Bytes; use chrono::Utc; use digest::Output as DigestOutput; +use futures::future::join_all; use futures::StreamExt; use sea_orm::entity::prelude::*; +use sea_orm::sea_query::Expr; use sea_orm::ActiveValue::Set; -use sea_orm::TransactionTrait; +use sea_orm::{QuerySelect, TransactionTrait}; use sha2::{Digest, Sha256}; -use tokio::io::{AsyncRead, BufReader}; -use tokio::sync::OnceCell; +use tokio::io::{AsyncBufRead, AsyncRead, BufReader}; +use tokio::sync::{OnceCell, Semaphore}; +use tokio::task::spawn; use tokio_util::io::StreamReader; use tracing::instrument; use uuid::Uuid; @@ -31,14 +37,31 @@ use attic::hash::Hash; use attic::stream::StreamHasher; use attic::util::Finally; +use crate::chunking::chunk_stream; use crate::database::entity::cache; +use crate::database::entity::chunk::{self, ChunkState, Entity as Chunk}; +use crate::database::entity::chunkref::{self, Entity as ChunkRef}; use crate::database::entity::nar::{self, Entity as Nar, NarState}; use crate::database::entity::object::{self, Entity as Object}; use crate::database::entity::Json as DbJson; -use crate::database::{AtticDatabase, NarGuard}; +use crate::database::{AtticDatabase, ChunkGuard, NarGuard}; + +/// Number of chunks to upload to the storage backend at once. +/// +/// TODO: Make this configurable +const CONCURRENT_CHUNK_UPLOADS: usize = 10; type CompressorFn = Box Box + Send>; +/// Data of a chunk. +enum ChunkData { + /// Some bytes in memory. + Bytes(Bytes), + + /// A stream with a user-claimed hash and size that are potentially incorrect. + Stream(Box, Hash, usize), +} + /// Applies compression to a stream, computing hashes along the way. /// /// Our strategy is to stream directly onto a UUID-keyed file on the @@ -112,18 +135,33 @@ pub(crate) async fn upload_path( // Try to acquire a lock on an existing NAR let existing_nar = database.find_and_lock_nar(&upload_info.nar_hash).await?; match existing_nar { + // FIXME: existing NAR may be missing chunks Some(existing_nar) => { - // Deduplicate - upload_path_dedup( - username, - cache, - upload_info, - stream, - database, - &state, - existing_nar, - ) - .await + // Deduplicate? + let missing_chunk = ChunkRef::find() + .filter(chunkref::Column::NarId.eq(existing_nar.id)) + .filter(chunkref::Column::ChunkId.is_null()) + .limit(1) + .one(database) + .await + .map_err(ServerError::database_error)?; + + if missing_chunk.is_some() { + // Need to repair + upload_path_new(username, cache, upload_info, stream, database, &state).await + } else { + // Can actually be deduplicated + upload_path_dedup( + username, + cache, + upload_info, + stream, + database, + &state, + existing_nar, + ) + .await + } } None => { // New NAR @@ -161,10 +199,6 @@ async fn upload_path_dedup( } } - let file_size = existing_nar.file_size - .map(|dbs| dbs.try_into().map_err(ServerError::database_error)) - .transpose()?; - // Finally... let txn = database .begin() @@ -197,7 +231,7 @@ async fn upload_path_dedup( Ok(Json(UploadPathResult { kind: UploadPathResultKind::Deduplicated, - file_size, + file_size: None, // TODO: Sum the chunks })) } @@ -214,105 +248,232 @@ async fn upload_path_new( database: &DatabaseConnection, state: &State, ) -> ServerResult> { + let nar_size_threshold = state.config.chunking.nar_size_threshold; + + if nar_size_threshold == 0 || upload_info.nar_size < nar_size_threshold { + upload_path_new_unchunked(username, cache, upload_info, stream, database, state).await + } else { + upload_path_new_chunked(username, cache, upload_info, stream, database, state).await + } +} + +/// Uploads a path when there is no matching NAR in the global cache (chunked). +async fn upload_path_new_chunked( + username: Option, + cache: cache::Model, + upload_info: UploadPathNarInfo, + stream: impl AsyncRead + Send + Unpin + 'static, + database: &DatabaseConnection, + state: &State, +) -> ServerResult> { + let chunking_config = &state.config.chunking; let compression_config = &state.config.compression; - let compression: Compression = compression_config.r#type.into(); - let level = compression_config.level(); - let compressor: CompressorFn<_> = match compression_config.r#type { - CompressionType::None => Box::new(|c| Box::new(c)), - CompressionType::Brotli => { - Box::new(move |s| Box::new(BrotliEncoder::with_quality(s, level))) - } - CompressionType::Zstd => Box::new(move |s| Box::new(ZstdEncoder::with_quality(s, level))), - CompressionType::Xz => Box::new(move |s| Box::new(XzEncoder::with_quality(s, level))), - }; + let compression_type = compression_config.r#type; + let compression_level = compression_config.level(); + let compression: Compression = compression_type.into(); - let backend = state.storage().await?; + let nar_size_db = i64::try_from(upload_info.nar_size).map_err(ServerError::request_error)?; - let key = format!("{}.nar", Uuid::new_v4()); + // FIXME: Maybe the client will send much more data than claimed + let (stream, nar_compute) = StreamHasher::new(stream, Sha256::new()); + let mut chunks = chunk_stream( + stream, + chunking_config.min_size, + chunking_config.avg_size, + chunking_config.max_size, + ); - let remote_file = backend.make_db_reference(key.clone()).await?; - let remote_file_id = remote_file.remote_file_id(); - let nar_id = { - let nar_size_db = - i64::try_from(upload_info.nar_size).map_err(ServerError::request_error)?; - let model = nar::ActiveModel { - state: Set(NarState::PendingUpload), - compression: Set(compression.to_string()), + let upload_chunk_limit = Arc::new(Semaphore::new(CONCURRENT_CHUNK_UPLOADS)); + let mut futures = Vec::new(); - // Untrusted data - To be confirmed later - nar_hash: Set(upload_info.nar_hash.to_typed_base16()), - nar_size: Set(nar_size_db), + while let Some(bytes) = chunks.next().await { + let bytes = bytes.map_err(ServerError::request_error)?; + let data = ChunkData::Bytes(bytes); - remote_file: Set(DbJson(remote_file)), - remote_file_id: Set(remote_file_id), + // Wait for a permit before spawning + // + // We want to block the receive process as well, otherwise it stays ahead and + // consumes too much memory + let permit = upload_chunk_limit.clone().acquire_owned().await.unwrap(); + futures.push({ + let database = database.clone(); + let state = state.clone(); + let require_proof_of_possession = state.config.require_proof_of_possession; - created_at: Set(Utc::now()), - ..Default::default() - }; - - let insertion = Nar::insert(model) - .exec(database) - .await - .map_err(ServerError::database_error)?; - - insertion.last_insert_id - }; - - let cleanup = Finally::new({ - let database = database.clone(); - let nar_model = nar::ActiveModel { - id: Set(nar_id), - ..Default::default() - }; - let backend = backend.clone(); - let key = key.clone(); - - async move { - tracing::warn!("Error occurred - Cleaning up uploaded file and NAR entry"); - - if let Err(e) = backend.delete_file(key).await { - tracing::warn!("Failed to clean up failed upload: {}", e); - } - - if let Err(e) = Nar::delete(nar_model).exec(&database).await { - tracing::warn!("Failed to unregister failed NAR: {}", e); - } - } - }); - - let mut stream = CompressionStream::new(stream, compressor); - - // Stream the object to the storage backend - backend - .upload_file(key, stream.stream()) - .await - .map_err(ServerError::storage_error)?; + spawn(async move { + let chunk = upload_chunk( + data, + compression_type, + compression_level, + database, + state, + require_proof_of_possession, + ) + .await?; + drop(permit); + Ok(chunk) + }) + }); + } // Confirm that the NAR Hash and Size are correct // FIXME: errors - let (nar_hash, nar_size) = stream.nar_hash_and_size().unwrap(); - let (file_hash, file_size) = stream.file_hash_and_size().unwrap(); - + let (nar_hash, nar_size) = nar_compute.get().unwrap(); let nar_hash = Hash::Sha256(nar_hash.as_slice().try_into().unwrap()); - let file_hash = Hash::Sha256(file_hash.as_slice().try_into().unwrap()); if nar_hash != upload_info.nar_hash || *nar_size != upload_info.nar_size { return Err(ErrorKind::RequestError(anyhow!("Bad NAR Hash or Size")).into()); } + // Wait for all uploads to complete + let chunks: Vec = join_all(futures) + .await + .into_iter() + .map(|join_result| join_result.unwrap()) + .collect::>>()?; + + let file_size = chunks + .iter() + .fold(0, |acc, c| acc + c.file_size.unwrap() as usize); + // Finally... let txn = database .begin() .await .map_err(ServerError::database_error)?; - // Update the file hash and size, and set the nar to valid - let file_size_db = i64::try_from(*file_size).map_err(ServerError::request_error)?; - Nar::update(nar::ActiveModel { - id: Set(nar_id), - state: Set(NarState::Valid), - file_hash: Set(Some(file_hash.to_typed_base16())), - file_size: Set(Some(file_size_db)), + // Create a NAR entry + let nar_id = { + let model = nar::ActiveModel { + state: Set(NarState::Valid), + compression: Set(compression.to_string()), + + nar_hash: Set(upload_info.nar_hash.to_typed_base16()), + nar_size: Set(nar_size_db), + + num_chunks: Set(chunks.len() as i32), + + created_at: Set(Utc::now()), + ..Default::default() + }; + + let insertion = Nar::insert(model) + .exec(&txn) + .await + .map_err(ServerError::database_error)?; + + insertion.last_insert_id + }; + + // Create mappings from the NAR to the chunks + for (i, chunk) in chunks.iter().enumerate() { + ChunkRef::insert(chunkref::ActiveModel { + nar_id: Set(nar_id), + seq: Set(i as i32), + chunk_id: Set(Some(chunk.id)), + chunk_hash: Set(chunk.chunk_hash.clone()), + compression: Set(chunk.compression.clone()), + ..Default::default() + }) + .exec(&txn) + .await + .map_err(ServerError::database_error)?; + } + + // Create a mapping granting the local cache access to the NAR + Object::delete_many() + .filter(object::Column::CacheId.eq(cache.id)) + .filter(object::Column::StorePathHash.eq(upload_info.store_path_hash.to_string())) + .exec(&txn) + .await + .map_err(ServerError::database_error)?; + Object::insert({ + let mut new_object = upload_info.to_active_model(); + new_object.cache_id = Set(cache.id); + new_object.nar_id = Set(nar_id); + new_object.created_at = Set(Utc::now()); + new_object.created_by = Set(username); + new_object + }) + .exec(&txn) + .await + .map_err(ServerError::database_error)?; + + txn.commit().await.map_err(ServerError::database_error)?; + + Ok(Json(UploadPathResult { + kind: UploadPathResultKind::Uploaded, + file_size: Some(file_size), + })) +} + +/// Uploads a path when there is no matching NAR in the global cache (unchunked). +/// +/// We upload the entire NAR as a single chunk. +async fn upload_path_new_unchunked( + username: Option, + cache: cache::Model, + upload_info: UploadPathNarInfo, + stream: impl AsyncRead + Send + Unpin + 'static, + database: &DatabaseConnection, + state: &State, +) -> ServerResult> { + let compression_config = &state.config.compression; + let compression_type = compression_config.r#type; + let compression: Compression = compression_type.into(); + + // Upload the entire NAR as a single chunk + let data = ChunkData::Stream( + Box::new(stream), + upload_info.nar_hash.clone(), + upload_info.nar_size, + ); + let chunk = upload_chunk( + data, + compression_type, + compression_config.level(), + database.clone(), + state.clone(), + state.config.require_proof_of_possession, + ) + .await?; + let file_size = chunk.file_size.unwrap() as usize; + + // Finally... + let txn = database + .begin() + .await + .map_err(ServerError::database_error)?; + + // Create a NAR entry + let nar_id = { + let model = nar::ActiveModel { + state: Set(NarState::Valid), + compression: Set(compression.to_string()), + + nar_hash: Set(upload_info.nar_hash.to_typed_base16()), + nar_size: Set(chunk.chunk_size), + + num_chunks: Set(1), + + created_at: Set(Utc::now()), + ..Default::default() + }; + + let insertion = Nar::insert(model) + .exec(&txn) + .await + .map_err(ServerError::database_error)?; + + insertion.last_insert_id + }; + + // Create a mapping from the NAR to the chunk + ChunkRef::insert(chunkref::ActiveModel { + nar_id: Set(nar_id), + seq: Set(0), + chunk_id: Set(Some(chunk.id)), + chunk_hash: Set(upload_info.nar_hash.to_typed_base16()), ..Default::default() }) .exec(&txn) @@ -340,15 +501,225 @@ async fn upload_path_new( txn.commit().await.map_err(ServerError::database_error)?; - cleanup.cancel(); - Ok(Json(UploadPathResult { kind: UploadPathResultKind::Uploaded, - file_size: Some(*file_size), + file_size: Some(file_size), })) } +/// Uploads a chunk with the desired compression. +/// +/// This will automatically perform deduplication if the chunk exists. +async fn upload_chunk( + data: ChunkData, + compression_type: CompressionType, + compression_level: CompressionLevel, + database: DatabaseConnection, + state: State, + require_proof_of_possession: bool, +) -> ServerResult { + let compression: Compression = compression_type.into(); + + let given_chunk_hash = data.hash(); + let given_chunk_size = data.size(); + + if let Some(existing_chunk) = database + .find_and_lock_chunk(&given_chunk_hash, compression) + .await? + { + // There's an existing chunk matching the hash + if require_proof_of_possession && !data.is_hash_trusted() { + let stream = data.into_async_read(); + + let (mut stream, nar_compute) = StreamHasher::new(stream, Sha256::new()); + tokio::io::copy(&mut stream, &mut tokio::io::sink()) + .await + .map_err(ServerError::request_error)?; + + // FIXME: errors + let (nar_hash, nar_size) = nar_compute.get().unwrap(); + let nar_hash = Hash::Sha256(nar_hash.as_slice().try_into().unwrap()); + + // Confirm that the NAR Hash and Size are correct + if nar_hash.to_typed_base16() != existing_chunk.chunk_hash + || *nar_size != given_chunk_size + || *nar_size != existing_chunk.chunk_size as usize + { + return Err(ErrorKind::RequestError(anyhow!("Bad chunk hash or size")).into()); + } + } + + return Ok(existing_chunk); + } + + let key = format!("{}.chunk", Uuid::new_v4()); + + let backend = state.storage().await?; + let remote_file = backend.make_db_reference(key.clone()).await?; + let remote_file_id = remote_file.remote_file_id(); + + let chunk_size_db = i64::try_from(given_chunk_size).map_err(ServerError::request_error)?; + + let chunk_id = { + let model = chunk::ActiveModel { + state: Set(ChunkState::PendingUpload), + compression: Set(compression.to_string()), + + // Untrusted data - To be confirmed later + chunk_hash: Set(given_chunk_hash.to_typed_base16()), + chunk_size: Set(chunk_size_db), + + remote_file: Set(DbJson(remote_file)), + remote_file_id: Set(remote_file_id), + + created_at: Set(Utc::now()), + ..Default::default() + }; + + let insertion = Chunk::insert(model) + .exec(&database) + .await + .map_err(ServerError::database_error)?; + + insertion.last_insert_id + }; + + let cleanup = Finally::new({ + let database = database.clone(); + let chunk_model = chunk::ActiveModel { + id: Set(chunk_id), + ..Default::default() + }; + let backend = backend.clone(); + let key = key.clone(); + + async move { + tracing::warn!("Error occurred - Cleaning up uploaded file and chunk entry"); + + if let Err(e) = backend.delete_file(key).await { + tracing::warn!("Failed to clean up failed upload: {}", e); + } + + if let Err(e) = Chunk::delete(chunk_model).exec(&database).await { + tracing::warn!("Failed to unregister failed chunk: {}", e); + } + } + }); + + // Compress and stream to the storage backend + let compressor = get_compressor_fn(compression_type, compression_level); + let mut stream = CompressionStream::new(data.into_async_read(), compressor); + + backend + .upload_file(key, stream.stream()) + .await + .map_err(ServerError::storage_error)?; + + // Confirm that the chunk hash is correct + let (chunk_hash, chunk_size) = stream.nar_hash_and_size().unwrap(); + let (file_hash, file_size) = stream.file_hash_and_size().unwrap(); + + let chunk_hash = Hash::Sha256(chunk_hash.as_slice().try_into().unwrap()); + let file_hash = Hash::Sha256(file_hash.as_slice().try_into().unwrap()); + + if chunk_hash != given_chunk_hash || *chunk_size != given_chunk_size { + return Err(ErrorKind::RequestError(anyhow!("Bad chunk hash or size")).into()); + } + + // Finally... + let txn = database + .begin() + .await + .map_err(ServerError::database_error)?; + + // Update the file hash and size, and set the chunk to valid + let file_size_db = i64::try_from(*file_size).map_err(ServerError::request_error)?; + let chunk = Chunk::update(chunk::ActiveModel { + id: Set(chunk_id), + state: Set(ChunkState::Valid), + file_hash: Set(Some(file_hash.to_typed_base16())), + file_size: Set(Some(file_size_db)), + holders_count: Set(1), + ..Default::default() + }) + .exec(&txn) + .await + .map_err(ServerError::database_error)?; + + // Also repair broken chunk references pointing at the same chunk + let repaired = ChunkRef::update_many() + .col_expr(chunkref::Column::ChunkId, Expr::value(chunk_id)) + .filter(chunkref::Column::ChunkId.is_null()) + .filter(chunkref::Column::ChunkHash.eq(chunk_hash.to_typed_base16())) + .filter(chunkref::Column::Compression.eq(compression.to_string())) + .exec(&txn) + .await + .map_err(ServerError::database_error)?; + + txn.commit().await.map_err(ServerError::database_error)?; + + cleanup.cancel(); + + tracing::debug!("Repaired {} chunkrefs", repaired.rows_affected); + + let guard = ChunkGuard::from_locked(database.clone(), chunk); + + Ok(guard) +} + +/// Returns a compressor function that takes some stream as input. +fn get_compressor_fn( + ctype: CompressionType, + level: CompressionLevel, +) -> CompressorFn { + match ctype { + CompressionType::None => Box::new(|c| Box::new(c)), + CompressionType::Brotli => { + Box::new(move |s| Box::new(BrotliEncoder::with_quality(s, level))) + } + CompressionType::Zstd => Box::new(move |s| Box::new(ZstdEncoder::with_quality(s, level))), + CompressionType::Xz => Box::new(move |s| Box::new(XzEncoder::with_quality(s, level))), + } +} + +impl ChunkData { + /// Returns the potentially-incorrect hash of the chunk. + fn hash(&self) -> Hash { + match self { + Self::Bytes(bytes) => { + let mut hasher = Sha256::new(); + hasher.update(bytes); + let hash = hasher.finalize(); + Hash::Sha256(hash.as_slice().try_into().unwrap()) + } + Self::Stream(_, hash, _) => hash.clone(), + } + } + + /// Returns the potentially-incorrect size of the chunk. + fn size(&self) -> usize { + match self { + Self::Bytes(bytes) => bytes.len(), + Self::Stream(_, _, size) => *size, + } + } + + /// Returns whether the hash is trusted. + fn is_hash_trusted(&self) -> bool { + matches!(self, ChunkData::Bytes(_)) + } + + /// Turns the data into a stream. + fn into_async_read(self) -> Box { + match self { + Self::Bytes(bytes) => Box::new(Cursor::new(bytes)), + Self::Stream(stream, _, _) => stream, + } + } +} + impl CompressionStream { + /// Creates a new compression stream. fn new(stream: R, compressor: CompressorFn>>) -> Self where R: AsyncRead + Unpin + Send + 'static, @@ -369,6 +740,29 @@ impl CompressionStream { } } + /* + /// Creates a compression stream without compute the uncompressed hash/size. + /// + /// This is useful if you already know the hash. `nar_hash_and_size` will + /// always return `None`. + fn new_without_nar_hash(stream: R, compressor: CompressorFn>) -> Self + where + R: AsyncRead + Unpin + Send + 'static, + { + // compress NAR + let stream = compressor(BufReader::new(stream)); + + // compute file hash and size + let (stream, file_compute) = StreamHasher::new(stream, Sha256::new()); + + Self { + stream: Box::new(stream), + nar_compute: Arc::new(OnceCell::new()), + file_compute, + } + } + */ + /// Returns the stream of the compressed object. fn stream(&mut self) -> &mut (impl AsyncRead + Unpin) { &mut self.stream diff --git a/server/src/chunking/mod.rs b/server/src/chunking/mod.rs new file mode 100644 index 0000000..4896046 --- /dev/null +++ b/server/src/chunking/mod.rs @@ -0,0 +1,123 @@ +//! Chunking. +//! +//! We perform chunking on uncompressed NARs using the FastCDC +//! algorithm. + +use async_stream::try_stream; +use bytes::{BufMut, Bytes, BytesMut}; +use fastcdc::FastCDC; +use futures::stream::Stream; +use tokio::io::{AsyncRead, AsyncReadExt}; + +/// Splits a streams into content-defined chunks. +/// +/// This is a wrapper over fastcdc-rs that takes an `AsyncRead` and +/// returns a `Stream` of chunks as `Bytes`s. +pub fn chunk_stream( + mut stream: R, + min_size: usize, + avg_size: usize, + max_size: usize, +) -> impl Stream> +where + R: AsyncRead + Unpin + Send, +{ + let s = try_stream! { + let mut buf = BytesMut::with_capacity(max_size); + + loop { + let read = read_chunk_async(&mut stream, buf).await?; + + let mut eof = false; + if read.len() == 0 { + // Already EOF + break; + } else if read.len() < max_size { + // Last read + eof = true; + } + + let chunks = FastCDC::with_eof(&read, min_size, avg_size, max_size, eof); + let mut consumed = 0; + + for chunk in chunks { + consumed += chunk.length; + + let slice = read.slice(chunk.offset..chunk.offset + chunk.length); + yield slice; + } + + if eof { + break; + } + + buf = BytesMut::with_capacity(max_size); + + if consumed < read.len() { + // remaining bytes for the next read + buf.put_slice(&read[consumed..]); + } + } + }; + + Box::pin(s) +} + +async fn read_chunk_async( + stream: &mut S, + mut chunk: BytesMut, +) -> std::io::Result { + while chunk.len() < chunk.capacity() { + let read = stream.read_buf(&mut chunk).await?; + + if read == 0 { + break; + } + } + + Ok(chunk.freeze()) +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::io::Cursor; + + use futures::StreamExt; + use tokio_test::block_on; + + /// Chunks and reconstructs a file. + #[test] + fn test_chunking_basic() { + block_on(async move { + let test_file = get_data(32 * 1024 * 1024); // 32 MiB + let mut reconstructed_file = Vec::new(); + + let cursor = Cursor::new(&test_file); + let mut chunks = chunk_stream(cursor, 8 * 1024, 16 * 1024, 32 * 1024); + + while let Some(chunk) = chunks.next().await { + let chunk = chunk.unwrap(); + eprintln!("Got a {}-byte chunk", chunk.len()); + reconstructed_file.extend(chunk); + } + + assert_eq!(reconstructed_file, test_file); + }); + } + + /// Returns some fake data. + fn get_data(len: usize) -> Vec { + let mut state = 42u32; + let mut data = vec![0u8; len]; + + for i in 0..data.len() { + (state, _) = state.overflowing_mul(1664525u32); + (state, _) = state.overflowing_add(1013904223u32); + data[i] = ((state >> (i % 24)) & 0xff) as u8; + } + + data + } +} diff --git a/server/src/config-template.toml b/server/src/config-template.toml index 1add4c8..84127d2 100644 --- a/server/src/config-template.toml +++ b/server/src/config-template.toml @@ -86,6 +86,28 @@ path = "%storage_path%" # access_key_id = "" # secret_access_key = "" +# Data chunking +# +# Warning: If you change any of the values here, it will be +# difficult to reuse existing chunks for newly-uploaded NARs +# since the cutpoints will be different. As a result, the +# deduplication ratio will suffer for a while after the change. +[chunking] +# The minimum NAR size to trigger chunking +# +# If 0, chunking is disabled entirely for newly-uploaded NARs. +# If 1, all NARs are chunked. +nar-size-threshold = 65536 # chunk files that are 64 KiB or larger + +# The preferred minimum size of a chunk, in bytes +min-size = 16384 # 16 KiB + +# The preferred average size of a chunk, in bytes +avg-size = 65536 # 64 KiB + +# The preferred maximum size of a chunk, in bytes +max-size = 262144 # 256 KiB + # Compression [compression] # Compression type diff --git a/server/src/config.rs b/server/src/config.rs index d77e811..3de159f 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -92,6 +92,9 @@ pub struct Config { /// Storage. pub storage: StorageConfig, + /// Data chunking. + pub chunking: ChunkingConfig, + /// Compression. #[serde(default = "Default::default")] pub compression: CompressionConfig, @@ -137,6 +140,46 @@ pub enum StorageConfig { S3(S3StorageConfig), } +/// Data chunking. +/// +/// This must be set, but a default set of values is provided +/// through the OOBE sequence. The reason is that this allows +/// us to provide a new set of recommended "defaults" for newer +/// deployments without affecting existing ones. +/// +/// Warning: If you change any of the values here, it will be +/// difficult to reuse existing chunks for newly-uploaded NARs +/// since the cutpoints will be different. As a result, the +/// deduplication ratio will suffer for a while after the change. +/// +/// `atticadm test-chunking` provides a way to test chunking +/// on a set of files so you can fine-tune the values. +#[derive(Debug, Clone, Deserialize)] +pub struct ChunkingConfig { + /// The minimum NAR size to trigger chunking. + /// + /// If 0, chunking is disabled entirely for newly-uploaded + /// NARs. + /// + /// If 1, all newly-uploaded NARs are chunked. + /// + /// By default, the threshold is 128KB. + #[serde(rename = "nar-size-threshold")] + pub nar_size_threshold: usize, + + /// The preferred minimum size of a chunk, in bytes. + #[serde(rename = "min-size")] + pub min_size: usize, + + /// The preferred average size of a chunk, in bytes. + #[serde(rename = "avg-size")] + pub avg_size: usize, + + /// The preferred maximum size of a chunk, in bytes. + #[serde(rename = "max-size")] + pub max_size: usize, +} + /// Compression configuration. #[derive(Debug, Clone, Deserialize)] pub struct CompressionConfig { @@ -294,7 +337,7 @@ fn load_config_from_str(s: &str) -> Result { /// Loads the configuration in the standard order. pub async fn load_config(config_path: Option<&Path>, allow_oobe: bool) -> Result { if let Some(config_path) = config_path { - load_config_from_path(&config_path) + load_config_from_path(config_path) } else if let Ok(config_env) = env::var(ENV_CONFIG_BASE64) { let decoded = String::from_utf8(base64::decode(config_env.as_bytes())?)?; load_config_from_str(&decoded) diff --git a/server/src/database/entity/chunk.rs b/server/src/database/entity/chunk.rs new file mode 100644 index 0000000..0c302ae --- /dev/null +++ b/server/src/database/entity/chunk.rs @@ -0,0 +1,111 @@ +//! A content-addressed chunk in the global chunk store. + +use sea_orm::entity::prelude::*; + +use super::Json; +use crate::storage::RemoteFile; + +pub type ChunkModel = Model; + +/// The state of a chunk. +#[derive(EnumIter, DeriveActiveEnum, Debug, Clone, PartialEq, Eq)] +#[sea_orm(rs_type = "String", db_type = "String(Some(1))")] +pub enum ChunkState { + /// The chunk can be used. + /// + /// The raw and compressed hashes are available. + #[sea_orm(string_value = "V")] + Valid, + + /// The chunk is a pending upload. + /// + /// The raw and compressed hashes may not be available. + #[sea_orm(string_value = "P")] + PendingUpload, + + /// The chunk can be deleted because it already exists. + /// + /// This state can be transitioned into from `PendingUpload` + /// if some other client completes uploading the same chunk + /// faster. + #[sea_orm(string_value = "C")] + ConfirmedDeduplicated, + + /// The chunk is being deleted. + /// + /// This row will be deleted shortly. + #[sea_orm(string_value = "D")] + Deleted, +} + +/// A content-addressed chunk in the global cache. +#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "chunk")] +pub struct Model { + /// Unique numeric ID of the chunk. + #[sea_orm(primary_key)] + pub id: i64, + + /// The state of the chunk. + state: ChunkState, + + /// The hash of the uncompressed chunk. + /// + /// This always begins with "sha256:" with the hash in the + /// hexadecimal format. + /// + /// The global chunk store may have several chunks with the same + /// hash: + /// + /// - Racing uploads from different clients + /// - Different compression methods + #[sea_orm(indexed)] + pub chunk_hash: String, + + /// The size of the uncompressed chunk. + pub chunk_size: i64, + + /// The hash of the compressed chunk. + /// + /// This always begins with "sha256:" with the hash in the + /// hexadecimal format. + /// + /// This field may not be available if the file hashes aren't + /// confirmed. + pub file_hash: Option, + + /// The size of the compressed chunk. + /// + /// This field may not be available if the file hashes aren't + /// confirmed. + pub file_size: Option, + + /// The type of compression in use. + #[sea_orm(column_type = "String(Some(10))")] + pub compression: String, + + /// The remote file backing this chunk. + pub remote_file: Json, + + /// Unique string identifying the remote file. + #[sea_orm(unique)] + pub remote_file_id: String, + + /// Number of processes holding this chunk. + /// + /// This is for preventing garbage collection of chunks when + /// there is a pending upload that can be deduplicated and + /// there are no existing NAR references. + pub holders_count: i32, + + /// Timestamp when the chunk is created. + pub created_at: ChronoDateTimeUtc, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm(has_many = "super::chunkref::Entity")] + ChunkRef, +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/server/src/database/entity/chunkref.rs b/server/src/database/entity/chunkref.rs new file mode 100644 index 0000000..e0c5a8c --- /dev/null +++ b/server/src/database/entity/chunkref.rs @@ -0,0 +1,84 @@ +//! A reference binding a NAR and a chunk. +//! +//! A NAR is backed by a sequence of chunks. +//! +//! A chunk may become unavailable (e.g., disk corruption) and +//! removed from the database, in which case all dependent NARs +//! will become unavailable. +//! +//! Such scenario can be recovered from by reuploading any object +//! that has the missing chunk. `atticadm` will have the functionality +//! to kill/delete a corrupted chunk from the database and to find +//! objects with missing chunks so they can be repaired. + +use sea_orm::entity::prelude::*; + +pub type ChunkRefModel = Model; + +/// A reference binding a NAR to a chunk. +#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "chunkref")] +pub struct Model { + /// Unique numeric ID of the link. + #[sea_orm(primary_key)] + pub id: i64, + + /// ID of the NAR. + #[sea_orm(indexed)] + pub nar_id: i64, + + /// The zero-indexed sequence number of the chunk. + pub seq: i32, + + /// ID of the chunk. + /// + /// This may be NULL when the chunk is missing from the + /// database. + #[sea_orm(indexed)] + pub chunk_id: Option, + + /// The hash of the uncompressed chunk. + /// + /// This always begins with "sha256:" with the hash in the + /// hexadecimal format. + /// + /// This is used for recovering from a missing chunk. + #[sea_orm(indexed)] + pub chunk_hash: String, + + /// The compression of the compressed chunk. + /// + /// This is used for recovering from a missing chunk. + pub compression: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "super::chunk::Entity", + from = "Column::ChunkId", + to = "super::chunk::Column::Id" + )] + Chunk, + + #[sea_orm( + belongs_to = "super::nar::Entity", + from = "Column::NarId", + to = "super::nar::Column::Id" + )] + Nar, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Chunk.def() + } +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Nar.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/server/src/database/entity/mod.rs b/server/src/database/entity/mod.rs index 6f6acb3..8a4957a 100644 --- a/server/src/database/entity/mod.rs +++ b/server/src/database/entity/mod.rs @@ -3,6 +3,8 @@ //! We use SeaORM and target PostgreSQL (production) and SQLite (development). pub mod cache; +pub mod chunk; +pub mod chunkref; pub mod nar; pub mod object; diff --git a/server/src/database/entity/nar.rs b/server/src/database/entity/nar.rs index fe404ef..01f29bc 100644 --- a/server/src/database/entity/nar.rs +++ b/server/src/database/entity/nar.rs @@ -2,9 +2,6 @@ use sea_orm::entity::prelude::*; -use super::Json; -use crate::storage::RemoteFile; - pub type NarModel = Model; /// The state of a NAR. @@ -35,6 +32,7 @@ pub enum NarState { /// The NAR is being deleted. /// /// This row will be deleted shortly. + /// This variant is no longer used since the actual storage is managed as chunks. #[sea_orm(string_value = "D")] Deleted, } @@ -44,6 +42,27 @@ pub enum NarState { /// A NAR without `nix-store --export` metadata is context-free, /// meaning that it's not associated with a store path and only /// depends on its contents. +/// +/// ## NAR Repair +/// +/// After a NAR is transitioned into the `Valid` state, its list +/// of constituent chunks in `chunkref` is immutable. When a client +/// uploads an existing NAR and the NAR has unavailable chunks, +/// a new `nar` entry is created and all dependent `object` rows +/// will have the `nar_id` updated. The old `nar` entry will +/// be garbage-collected. +/// +/// Why don't we just fill in the missing chunks in the existing +/// `nar`? Because the NAR stream from the client _might_ be chunked +/// differently. This is not supposed to happen since FastCDC +/// has a deterministic lookup table for cut-point judgment, however +/// we want the system to tolerate different chunking behavior because +/// of table changes, for example. +/// +/// However, when a chunk is added, all broken `chunkref`s with +/// the same `chunk_hash` _are_ repaired. In other words, by +/// re-uploading a broken NAR you are helping other NARs with +/// the same broken chunk. #[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel)] #[sea_orm(table_name = "nar")] pub struct Model { @@ -70,31 +89,24 @@ pub struct Model { /// The size of the NAR archive. pub nar_size: i64, - /// The hash of the compressed file. - /// - /// This always begins with "sha256:" with the hash in the - /// hexadecimal format. - /// - /// This field may not be available if the file hashes aren't - /// confirmed. - pub file_hash: Option, - - /// The size of the compressed file. - /// - /// This field may not be available if the file hashes aren't - /// confirmed. - pub file_size: Option, - /// The type of compression in use. #[sea_orm(column_type = "String(Some(10))")] pub compression: String, - /// The remote file backing this NAR. - pub remote_file: Json, + /// Number of chunks that make up this NAR. + pub num_chunks: i32, - /// Unique string identifying the remote file. - #[sea_orm(unique)] - pub remote_file_id: String, + /// Hint indicating whether all chunks making up this NAR are available. + /// + /// This is used by the `get-missing-paths` endpoint to + /// also return store paths that are inaccessible due to + /// missing chunks in the associated NARs. They can then be + /// repaired by any client uploading. + /// + /// This flag may be outdated, but it's okay since when a client + /// tries to upload the same NAR, it will be immediately deduplicated + /// if all chunks are present and the flag will be updated. + pub completeness_hint: bool, /// Number of processes holding this NAR. /// @@ -111,6 +123,9 @@ pub struct Model { pub enum Relation { #[sea_orm(has_many = "super::object::Entity")] Object, + + #[sea_orm(has_many = "super::chunkref::Entity")] + ChunkRef, } impl ActiveModelBehavior for ActiveModel {} diff --git a/server/src/database/entity/object.rs b/server/src/database/entity/object.rs index da6233f..37ad893 100644 --- a/server/src/database/entity/object.rs +++ b/server/src/database/entity/object.rs @@ -90,12 +90,6 @@ pub enum Relation { impl Model { /// Converts this object to a NarInfo. pub fn to_nar_info(&self, nar: &NarModel) -> ServerResult { - // FIXME: Return Err if file_hash and file_size don't exist - let file_size = nar - .file_size - .unwrap() - .try_into() - .map_err(ServerError::database_error)?; let nar_size = nar .nar_size .try_into() @@ -106,8 +100,8 @@ impl Model { url: format!("nar/{}.nar", self.store_path_hash.as_str()), compression: Compression::from_str(&nar.compression)?, - file_hash: Hash::from_typed(nar.file_hash.as_ref().unwrap())?, - file_size, + file_hash: None, // FIXME + file_size: None, // FIXME nar_hash: Hash::from_typed(&nar.nar_hash)?, nar_size, system: self.system.to_owned(), diff --git a/server/src/database/migration/m20221227_000002_create_nar_table.rs b/server/src/database/migration/m20221227_000002_create_nar_table.rs index bcc2dd8..875c451 100644 --- a/server/src/database/migration/m20221227_000002_create_nar_table.rs +++ b/server/src/database/migration/m20221227_000002_create_nar_table.rs @@ -33,12 +33,16 @@ impl MigrationTrait for Migration { ) .col(ColumnDef::new(Column::NarHash).string().not_null()) .col(ColumnDef::new(Column::NarSize).big_integer().not_null()) - .col(ColumnDef::new(Column::FileHash).string().null()) - .col(ColumnDef::new(Column::FileSize).big_integer().null()) + .col(ColumnDef::new(Alias::new("file_hash")).string().null()) + .col(ColumnDef::new(Alias::new("file_size")).big_integer().null()) .col(ColumnDef::new(Column::Compression).string().not_null()) - .col(ColumnDef::new(Column::RemoteFile).string().not_null()) .col( - ColumnDef::new(Column::RemoteFileId) + ColumnDef::new(Alias::new("remote_file")) + .string() + .not_null(), + ) + .col( + ColumnDef::new(Alias::new("remote_file_id")) .string() .not_null() .unique_key(), diff --git a/server/src/database/migration/m20230112_000001_add_chunk_table.rs b/server/src/database/migration/m20230112_000001_add_chunk_table.rs new file mode 100644 index 0000000..3aeef94 --- /dev/null +++ b/server/src/database/migration/m20230112_000001_add_chunk_table.rs @@ -0,0 +1,70 @@ +use sea_orm_migration::prelude::*; + +use crate::database::entity::chunk::*; + +pub struct Migration; + +impl MigrationName for Migration { + fn name(&self) -> &str { + "m20230112_000001_add_chunk_table" + } +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Entity) + .col( + ColumnDef::new(Column::Id) + .big_integer() + .not_null() + .auto_increment() + .primary_key(), + ) + .col( + ColumnDef::new(Column::State) + .r#char() + .char_len(1) + .not_null(), + ) + .col(ColumnDef::new(Column::ChunkHash).string().not_null()) + .col(ColumnDef::new(Column::ChunkSize).big_integer().not_null()) + .col(ColumnDef::new(Alias::new("file_hash")).string().null()) + .col(ColumnDef::new(Alias::new("file_size")).big_integer().null()) + .col(ColumnDef::new(Column::Compression).string().not_null()) + .col(ColumnDef::new(Column::RemoteFile).string().not_null()) + .col( + ColumnDef::new(Column::RemoteFileId) + .string() + .not_null() + .unique_key(), + ) + .col( + ColumnDef::new(Column::HoldersCount) + .integer() + .not_null() + .default(0), + ) + .col( + ColumnDef::new(Column::CreatedAt) + .timestamp_with_time_zone() + .not_null(), + ) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("idx-chunk-chunk-hash") + .table(Entity) + .col(Column::ChunkHash) + .to_owned(), + ) + .await + } +} diff --git a/server/src/database/migration/m20230112_000002_add_chunkref_table.rs b/server/src/database/migration/m20230112_000002_add_chunkref_table.rs new file mode 100644 index 0000000..2153841 --- /dev/null +++ b/server/src/database/migration/m20230112_000002_add_chunkref_table.rs @@ -0,0 +1,76 @@ +use sea_orm_migration::prelude::*; + +use crate::database::entity::chunk; +use crate::database::entity::chunkref::*; +use crate::database::entity::nar; + +pub struct Migration; + +impl MigrationName for Migration { + fn name(&self) -> &str { + "m20230112_000002_add_chunkref_table" + } +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Entity) + .col( + ColumnDef::new(Column::Id) + .big_integer() + .not_null() + .auto_increment() + .primary_key(), + ) + .col(ColumnDef::new(Column::NarId).big_integer().not_null()) + .col(ColumnDef::new(Column::Seq).integer().not_null()) + .col(ColumnDef::new(Column::ChunkId).big_integer().null()) + .col(ColumnDef::new(Column::ChunkHash).string().not_null()) + .col(ColumnDef::new(Column::Compression).string().not_null()) + .foreign_key( + ForeignKeyCreateStatement::new() + .name("fk_chunkref_chunk") + .from_tbl(Entity) + .from_col(Column::ChunkId) + .to_tbl(chunk::Entity) + .to_col(chunk::Column::Id) + .on_delete(ForeignKeyAction::SetNull), + ) + .foreign_key( + ForeignKeyCreateStatement::new() + .name("fk_chunkref_nar") + .from_tbl(Entity) + .from_col(Column::NarId) + .to_tbl(nar::Entity) + .to_col(nar::Column::Id) + .on_delete(ForeignKeyAction::Cascade), + ) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("idx-chunk-nar-id") + .table(Entity) + .col(Column::NarId) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("idx-chunk-chunk-id") + .table(Entity) + .col(Column::ChunkId) + .to_owned(), + ) + .await + } +} diff --git a/server/src/database/migration/m20230112_000003_add_nar_num_chunks.rs b/server/src/database/migration/m20230112_000003_add_nar_num_chunks.rs new file mode 100644 index 0000000..bbf9fb7 --- /dev/null +++ b/server/src/database/migration/m20230112_000003_add_nar_num_chunks.rs @@ -0,0 +1,32 @@ +use sea_orm_migration::prelude::*; + +use crate::database::entity::nar::*; + +pub struct Migration; + +impl MigrationName for Migration { + fn name(&self) -> &str { + "m20230112_000003_add_nar_num_chunks" + } +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Entity) + .add_column( + ColumnDef::new(Column::NumChunks) + .integer() + .not_null() + .default(1), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/server/src/database/migration/m20230112_000004_migrate_nar_remote_files_to_chunks.rs b/server/src/database/migration/m20230112_000004_migrate_nar_remote_files_to_chunks.rs new file mode 100644 index 0000000..42d70a6 --- /dev/null +++ b/server/src/database/migration/m20230112_000004_migrate_nar_remote_files_to_chunks.rs @@ -0,0 +1,143 @@ +use sea_orm::{ConnectionTrait, TransactionTrait}; +use sea_orm_migration::prelude::*; + +use crate::database::entity::chunk; +use crate::database::entity::chunkref; +use crate::database::entity::nar; + +pub struct Migration; + +pub enum TempChunkCols { + /// The ID of the NAR. + NarId, +} + +impl MigrationName for Migration { + fn name(&self) -> &str { + "m20230112_000004_migrate_nar_remote_files_to_chunks" + } +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // When this migration is run, we assume that there are no + // preexisting chunks. + + eprintln!("* Migrating NARs to chunks..."); + + // Add a temporary column into `chunk` to store the related `nar_id`. + manager + .alter_table( + Table::alter() + .table(chunk::Entity) + .add_column_if_not_exists( + ColumnDef::new(TempChunkCols::NarId).integer().not_null(), + ) + .to_owned(), + ) + .await?; + + // Get the original values from NARs + let select_remote_file = Query::select() + .from(nar::Entity) + .columns([ + nar::Column::Id.into_iden(), + Alias::new("remote_file").into_iden(), + Alias::new("remote_file_id").into_iden(), + nar::Column::NarHash.into_iden(), + nar::Column::NarSize.into_iden(), + Alias::new("file_hash").into_iden(), + Alias::new("file_size").into_iden(), + nar::Column::Compression.into_iden(), + nar::Column::CreatedAt.into_iden(), + ]) + .expr_as(chunk::ChunkState::Valid, chunk::Column::State.into_iden()) + .to_owned(); + + // ... insert them into the `chunk` table + let insert_chunk = Query::insert() + .into_table(chunk::Entity) + .columns([ + TempChunkCols::NarId.into_iden(), + chunk::Column::RemoteFile.into_iden(), + chunk::Column::RemoteFileId.into_iden(), + chunk::Column::ChunkHash.into_iden(), + chunk::Column::ChunkSize.into_iden(), + chunk::Column::FileHash.into_iden(), + chunk::Column::FileSize.into_iden(), + chunk::Column::Compression.into_iden(), + chunk::Column::CreatedAt.into_iden(), + chunk::Column::State.into_iden(), + ]) + .select_from(select_remote_file) + .unwrap() + .returning(Query::returning().columns([ + chunk::Column::Id.into_column_ref(), + TempChunkCols::NarId.into_column_ref(), + ])) + .to_owned(); + + let insert_chunk_stmt = manager.get_database_backend().build(&insert_chunk); + + // ... then create chunkrefs binding the chunks and original NARs + let select_chunk = Query::select() + .from(chunk::Entity) + .columns([ + chunk::Column::Id.into_iden(), + TempChunkCols::NarId.into_iden(), + chunk::Column::ChunkHash.into_iden(), + chunk::Column::Compression.into_iden(), + ]) + .expr_as(0, chunkref::Column::Seq.into_iden()) + .to_owned(); + + let insert_chunkref = Query::insert() + .into_table(chunkref::Entity) + .columns([ + chunkref::Column::ChunkId.into_iden(), + chunkref::Column::NarId.into_iden(), + chunkref::Column::ChunkHash.into_iden(), + chunkref::Column::Compression.into_iden(), + chunkref::Column::Seq.into_iden(), + ]) + .select_from(select_chunk) + .unwrap() + .returning(Query::returning().columns([chunkref::Column::Id.into_column_ref()])) + .to_owned(); + + let insert_chunkref_stmt = manager.get_database_backend().build(&insert_chunkref); + + // Actually run the migration + let txn = manager.get_connection().begin().await?; + txn.execute(insert_chunk_stmt).await?; + txn.execute(insert_chunkref_stmt).await?; + txn.commit().await?; + + // Finally, drop the temporary column + manager + .alter_table( + Table::alter() + .table(chunk::Entity) + .drop_column(TempChunkCols::NarId) + .to_owned(), + ) + .await?; + + // We will drop the unused columns in `nar` in the next migration + Ok(()) + } +} + +impl Iden for TempChunkCols { + fn unquoted(&self, s: &mut dyn std::fmt::Write) { + write!( + s, + "{}", + match self { + Self::NarId => "temp_nar_id", + } + ) + .unwrap(); + } +} diff --git a/server/src/database/migration/m20230112_000005_drop_old_nar_columns.rs b/server/src/database/migration/m20230112_000005_drop_old_nar_columns.rs new file mode 100644 index 0000000..9d29b66 --- /dev/null +++ b/server/src/database/migration/m20230112_000005_drop_old_nar_columns.rs @@ -0,0 +1,159 @@ +use sea_orm::{ConnectionTrait, DatabaseBackend, Statement}; +use sea_orm_migration::prelude::*; + +use crate::database::entity::nar::{self, *}; + +pub struct Migration; + +const TEMP_NAR_TABLE: &str = "nar_new"; + +impl MigrationName for Migration { + fn name(&self) -> &str { + "m20230112_000005_drop_old_nar_columns" + } +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + eprintln!("* Migrating NAR schema..."); + + if manager.get_database_backend() == DatabaseBackend::Sqlite { + // Just copy all data to a new table + manager + .get_connection() + .execute(Statement::from_string( + manager.get_database_backend(), + "PRAGMA foreign_keys = OFF".to_owned(), + )) + .await?; + + manager + .create_table( + Table::create() + .table(Alias::new(TEMP_NAR_TABLE)) + .if_not_exists() + .col( + ColumnDef::new(Column::Id) + .big_integer() + .not_null() + .auto_increment() + .primary_key(), + ) + .col( + ColumnDef::new(Column::State) + .r#char() + .char_len(1) + .not_null(), + ) + .col(ColumnDef::new(Column::NarHash).string().not_null()) + .col(ColumnDef::new(Column::NarSize).big_integer().not_null()) + .col(ColumnDef::new(Column::Compression).string().not_null()) + .col( + ColumnDef::new(Column::NumChunks) + .integer() + .not_null() + .default(1), + ) + .col( + ColumnDef::new(Column::HoldersCount) + .integer() + .not_null() + .default(0), + ) + .col( + ColumnDef::new(Column::CreatedAt) + .timestamp_with_time_zone() + .not_null(), + ) + .to_owned(), + ) + .await?; + + let columns = [ + nar::Column::Id.into_iden(), + nar::Column::State.into_iden(), + nar::Column::NarHash.into_iden(), + nar::Column::NarSize.into_iden(), + nar::Column::Compression.into_iden(), + nar::Column::NumChunks.into_iden(), + nar::Column::HoldersCount.into_iden(), + nar::Column::CreatedAt.into_iden(), + ]; + + let select_nar = Query::select() + .from(nar::Entity) + .columns(columns.clone()) + .to_owned(); + + let insertion = Query::insert() + .into_table(Alias::new(TEMP_NAR_TABLE)) + .columns(columns.clone()) + .select_from(select_nar) + .unwrap() + .to_owned(); + + let insertion_stmt = manager.get_database_backend().build(&insertion); + manager.get_connection().execute(insertion_stmt).await?; + + manager + .drop_table(Table::drop().table(nar::Entity).to_owned()) + .await?; + + manager + .rename_table( + Table::rename() + .table(Alias::new(TEMP_NAR_TABLE), nar::Entity) + .to_owned(), + ) + .await?; + + manager + .get_connection() + .execute(Statement::from_string( + manager.get_database_backend(), + "PRAGMA foreign_keys = ON".to_owned(), + )) + .await?; + } else { + // Just drop the columns + manager + .alter_table( + Table::alter() + .table(nar::Entity) + .drop_column(Alias::new("file_hash")) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(nar::Entity) + .drop_column(Alias::new("file_size")) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(nar::Entity) + .drop_column(Alias::new("remote_file")) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(nar::Entity) + .drop_column(Alias::new("remote_file_id")) + .to_owned(), + ) + .await?; + } + + Ok(()) + } +} diff --git a/server/src/database/migration/m20230112_000006_add_nar_completeness_hint.rs b/server/src/database/migration/m20230112_000006_add_nar_completeness_hint.rs new file mode 100644 index 0000000..d3f3a15 --- /dev/null +++ b/server/src/database/migration/m20230112_000006_add_nar_completeness_hint.rs @@ -0,0 +1,32 @@ +use sea_orm_migration::prelude::*; + +use crate::database::entity::nar::*; + +pub struct Migration; + +impl MigrationName for Migration { + fn name(&self) -> &str { + "m20230112_000006_add_nar_completeness_hint" + } +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Entity) + .add_column( + ColumnDef::new(Column::CompletenessHint) + .boolean() + .not_null() + .default(true), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/server/src/database/migration/mod.rs b/server/src/database/migration/mod.rs index b42033f..406e709 100644 --- a/server/src/database/migration/mod.rs +++ b/server/src/database/migration/mod.rs @@ -8,6 +8,12 @@ mod m20221227_000003_create_object_table; mod m20221227_000004_add_object_last_accessed; mod m20221227_000005_add_cache_retention_period; mod m20230103_000001_add_object_created_by; +mod m20230112_000001_add_chunk_table; +mod m20230112_000002_add_chunkref_table; +mod m20230112_000003_add_nar_num_chunks; +mod m20230112_000004_migrate_nar_remote_files_to_chunks; +mod m20230112_000005_drop_old_nar_columns; +mod m20230112_000006_add_nar_completeness_hint; pub struct Migrator; @@ -21,6 +27,12 @@ impl MigratorTrait for Migrator { Box::new(m20221227_000004_add_object_last_accessed::Migration), Box::new(m20221227_000005_add_cache_retention_period::Migration), Box::new(m20230103_000001_add_object_created_by::Migration), + Box::new(m20230112_000001_add_chunk_table::Migration), + Box::new(m20230112_000002_add_chunkref_table::Migration), + Box::new(m20230112_000003_add_nar_num_chunks::Migration), + Box::new(m20230112_000004_migrate_nar_remote_files_to_chunks::Migration), + Box::new(m20230112_000005_drop_old_nar_columns::Migration), + Box::new(m20230112_000006_add_nar_completeness_hint::Migration), ] } } diff --git a/server/src/database/mod.rs b/server/src/database/mod.rs index 1f2ab06..b7b144c 100644 --- a/server/src/database/mod.rs +++ b/server/src/database/mod.rs @@ -3,35 +3,43 @@ pub mod migration; use std::ops::Deref; +use anyhow::anyhow; use async_trait::async_trait; use chrono::Utc; use sea_orm::entity::prelude::*; use sea_orm::entity::Iterable as EnumIterable; -use sea_orm::query::{JoinType, QuerySelect, QueryTrait}; +use sea_orm::query::{JoinType, QueryOrder, QuerySelect, QueryTrait}; use sea_orm::sea_query::{Expr, LockBehavior, LockType, Query, Value}; use sea_orm::{ActiveValue::Set, ConnectionTrait, DatabaseConnection, FromQueryResult}; use tokio::task; use crate::error::{ErrorKind, ServerError, ServerResult}; +use crate::narinfo::Compression; use attic::cache::CacheName; use attic::hash::Hash; use attic::nix_store::StorePathHash; use entity::cache::{self, CacheModel, Entity as Cache}; +use entity::chunk::{self, ChunkModel, ChunkState, Entity as Chunk}; +use entity::chunkref; use entity::nar::{self, Entity as Nar, NarModel, NarState}; use entity::object::{self, Entity as Object, ObjectModel}; +// quintuple join time const SELECT_OBJECT: &str = "O_"; const SELECT_CACHE: &str = "C_"; const SELECT_NAR: &str = "N_"; +const SELECT_CHUNK: &str = "CH_"; +const SELECT_CHUNKREF: &str = "CHR_"; #[async_trait] pub trait AtticDatabase: Send + Sync { - /// Retrieves an object in a binary cache by its store path hash. - async fn find_object_by_store_path_hash( + /// Retrieves an object in a binary cache by its store path hash, returning all its + /// chunks. + async fn find_object_and_chunks_by_store_path_hash( &self, cache: &CacheName, store_path_hash: &StorePathHash, - ) -> ServerResult<(ObjectModel, CacheModel, NarModel)>; + ) -> ServerResult<(ObjectModel, CacheModel, NarModel, Vec>)>; /// Retrieves a binary cache. async fn find_cache(&self, cache: &CacheName) -> ServerResult; @@ -39,6 +47,13 @@ pub trait AtticDatabase: Send + Sync { /// Retrieves and locks a valid NAR matching a NAR Hash. async fn find_and_lock_nar(&self, nar_hash: &Hash) -> ServerResult>; + /// Retrieves and locks a valid chunk matching a chunk Hash. + async fn find_and_lock_chunk( + &self, + chunk_hash: &Hash, + compression: Compression, + ) -> ServerResult>; + /// Bumps the last accessed timestamp of an object. async fn bump_object_last_accessed(&self, object_id: i64) -> ServerResult<()>; } @@ -48,6 +63,11 @@ pub struct NarGuard { nar: NarModel, } +pub struct ChunkGuard { + database: DatabaseConnection, + chunk: ChunkModel, +} + fn prefix_column(mut select: S, prefix: &str) -> S { for col in ::iter() { let alias = format!("{}{}", prefix, Iden::to_string(&col)); @@ -57,47 +77,139 @@ fn prefix_column(mut select: S, prefix: &str) -> } pub fn build_cache_object_nar_query() -> Select { + /* + Build something like: + + -- chunkrefs must exist but chunks may not exist + + select * from object + inner join cache + on object.cache_id = cache.id + inner join nar + on object.nar_id = nar.id + inner join chunkref + on chunkref.nar_id = nar.id + left join chunk + on chunkref.chunk_id = chunk.id + where + object.store_path_hash = 'fiwsv60kgwrfvib2nf9dkq9q8bk1h7qh' and + nar.state = 'V' and + cache.name = 'zhaofeng' and + cache.deleted_at is null + + Returns (CacheModel, ObjectModel, NarModel, Vec>) + where the number of elements in the Vec must be equal to `nar.num_chunks`. + + If any element in the chunk `Vec` is `None`, it means the chunk is missing + for some reason (e.g., corrupted) and the full NAR cannot be reconstructed. + In such cases, .narinfo/.nar requests will return HTTP 503 and the affected + store paths will be treated as non-existent in `get-missing-paths` so they + can be repaired automatically when any client upload a path containing the + missing chunk. + + It's a quintuple join and the query plans look reasonable on SQLite + and Postgres. For each .narinfo/.nar request, we only submit a single query. + */ let mut query = Object::find() .select_only() - .join(JoinType::LeftJoin, object::Relation::Cache.def()) - .join(JoinType::LeftJoin, object::Relation::Nar.def()); + .join(JoinType::InnerJoin, object::Relation::Cache.def()) + .join(JoinType::InnerJoin, object::Relation::Nar.def()) + .join(JoinType::InnerJoin, nar::Relation::ChunkRef.def()) + .join(JoinType::LeftJoin, chunkref::Relation::Chunk.def()) + .order_by_asc(chunkref::Column::Seq); query = prefix_column::(query, SELECT_OBJECT); query = prefix_column::(query, SELECT_CACHE); query = prefix_column::(query, SELECT_NAR); + query = prefix_column::(query, SELECT_CHUNK); + query = prefix_column::(query, SELECT_CHUNKREF); query } #[async_trait] impl AtticDatabase for DatabaseConnection { - async fn find_object_by_store_path_hash( + async fn find_object_and_chunks_by_store_path_hash( &self, cache: &CacheName, store_path_hash: &StorePathHash, - ) -> ServerResult<(ObjectModel, CacheModel, NarModel)> { + ) -> ServerResult<(ObjectModel, CacheModel, NarModel, Vec>)> { let stmt = build_cache_object_nar_query() .filter(cache::Column::Name.eq(cache.as_str())) .filter(cache::Column::DeletedAt.is_null()) .filter(object::Column::StorePathHash.eq(store_path_hash.as_str())) .filter(nar::Column::State.eq(NarState::Valid)) - .limit(1) + .filter( + chunk::Column::State + .eq(ChunkState::Valid) + .or(chunk::Column::State.is_null()), + ) .build(self.get_database_backend()); - let result = self - .query_one(stmt) + let results = self + .query_all(stmt) .await - .map_err(ServerError::database_error)? - .ok_or(ErrorKind::NoSuchObject)?; - - let object = object::Model::from_query_result(&result, SELECT_OBJECT) - .map_err(ServerError::database_error)?; - let cache = cache::Model::from_query_result(&result, SELECT_CACHE) - .map_err(ServerError::database_error)?; - let nar = nar::Model::from_query_result(&result, SELECT_NAR) .map_err(ServerError::database_error)?; - Ok((object, cache, nar)) + if results.is_empty() { + return Err(ErrorKind::NoSuchObject.into()); + } + + let mut it = results.iter(); + let first = it.next().unwrap(); + + let mut chunks = Vec::new(); + + let object = object::Model::from_query_result(first, SELECT_OBJECT) + .map_err(ServerError::database_error)?; + let cache = cache::Model::from_query_result(first, SELECT_CACHE) + .map_err(ServerError::database_error)?; + let nar = nar::Model::from_query_result(first, SELECT_NAR) + .map_err(ServerError::database_error)?; + + if results.len() != nar.num_chunks as usize { + // Something went terribly wrong. This means there are a wrong number of `chunkref` rows. + return Err(ErrorKind::DatabaseError(anyhow!( + "Database returned the wrong number of chunks: Expected {}, got {}", + nar.num_chunks, + results.len() + )) + .into()); + } + + chunks.push({ + let chunk_id: Option = first + .try_get(SELECT_CHUNK, chunk::Column::Id.as_str()) + .map_err(ServerError::database_error)?; + + if chunk_id.is_some() { + Some( + chunk::Model::from_query_result(first, SELECT_CHUNK) + .map_err(ServerError::database_error)?, + ) + } else { + None + } + }); + + for chunk in it { + chunks.push({ + let chunk_id: Option = chunk + .try_get(SELECT_CHUNK, chunk::Column::Id.as_str()) + .map_err(ServerError::database_error)?; + + if chunk_id.is_some() { + Some( + chunk::Model::from_query_result(chunk, SELECT_CHUNK) + .map_err(ServerError::database_error)?, + ) + } else { + None + } + }); + } + + Ok((object, cache, nar, chunks)) } async fn find_cache(&self, cache: &CacheName) -> ServerResult { @@ -143,6 +255,45 @@ impl AtticDatabase for DatabaseConnection { Ok(guard) } + // FIXME: Repetition + async fn find_and_lock_chunk( + &self, + chunk_hash: &Hash, + compression: Compression, + ) -> ServerResult> { + let one = Value::Unsigned(Some(1)); + let matched_ids = Query::select() + .from(Chunk) + .and_where(chunk::Column::ChunkHash.eq(chunk_hash.to_typed_base16())) + .and_where(chunk::Column::State.eq(ChunkState::Valid)) + .and_where(chunk::Column::Compression.eq(compression.as_str())) + .expr(Expr::col(chunk::Column::Id)) + .lock_with_behavior(LockType::Update, LockBehavior::SkipLocked) + .limit(1) + .to_owned(); + let incr_holders = Query::update() + .table(Chunk) + .values([( + chunk::Column::HoldersCount, + Expr::col(chunk::Column::HoldersCount).add(one), + )]) + .and_where(chunk::Column::Id.in_subquery(matched_ids)) + .returning_all() + .to_owned(); + let stmt = self.get_database_backend().build(&incr_holders); + + let guard = chunk::Model::find_by_statement(stmt) + .one(self) + .await + .map_err(ServerError::database_error)? + .map(|chunk| ChunkGuard { + database: self.clone(), + chunk, + }); + + Ok(guard) + } + async fn bump_object_last_accessed(&self, object_id: i64) -> ServerResult<()> { let now = Utc::now(); @@ -192,3 +343,43 @@ impl Drop for NarGuard { }); } } + +impl ChunkGuard { + pub fn from_locked(database: DatabaseConnection, chunk: ChunkModel) -> Self { + Self { database, chunk } + } +} + +impl Deref for ChunkGuard { + type Target = ChunkModel; + + fn deref(&self) -> &Self::Target { + &self.chunk + } +} + +impl Drop for ChunkGuard { + fn drop(&mut self) { + let database = self.database.clone(); + let chunk_id = self.chunk.id; + + task::spawn(async move { + tracing::debug!("Unlocking chunk"); + + let one = Value::Unsigned(Some(1)); + let decr_holders = Query::update() + .table(Chunk) + .values([( + chunk::Column::HoldersCount, + Expr::col(chunk::Column::HoldersCount).sub(one), + )]) + .and_where(chunk::Column::Id.eq(chunk_id)) + .to_owned(); + let stmt = database.get_database_backend().build(&decr_holders); + + if let Err(e) = database.execute(stmt).await { + tracing::warn!("Failed to decrement holders count: {}", e); + } + }); + } +} diff --git a/server/src/error.rs b/server/src/error.rs index dcd559d..f5d5d41 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -54,6 +54,9 @@ pub enum ErrorKind { /// Invalid compression type "{name}". InvalidCompressionType { name: String }, + /// The requested NAR has missing chunks and needs to be repaired. + IncompleteNar, + /// Database error: {0} DatabaseError(AnyError), @@ -174,6 +177,7 @@ impl ErrorKind { Self::NoSuchCache => "NoSuchCache", Self::CacheAlreadyExists => "CacheAlreadyExists", Self::InvalidCompressionType { .. } => "InvalidCompressionType", + Self::IncompleteNar => "IncompleteNar", Self::AtticError(e) => e.name(), Self::DatabaseError(_) => "DatabaseError", Self::StorageError(_) => "StorageError", @@ -218,6 +222,7 @@ impl ErrorKind { Self::NoSuchCache => StatusCode::NOT_FOUND, Self::NoSuchObject => StatusCode::NOT_FOUND, Self::CacheAlreadyExists => StatusCode::BAD_REQUEST, + Self::IncompleteNar => StatusCode::SERVICE_UNAVAILABLE, Self::ManifestSerializationError(_) => StatusCode::BAD_REQUEST, Self::RequestError(_) => StatusCode::BAD_REQUEST, Self::InvalidCompressionType { .. } => StatusCode::BAD_REQUEST, diff --git a/server/src/gc.rs b/server/src/gc.rs index 1378673..7b92014 100644 --- a/server/src/gc.rs +++ b/server/src/gc.rs @@ -17,6 +17,8 @@ use tracing::instrument; use super::{State, StateInner}; use crate::config::Config; use crate::database::entity::cache::{self, Entity as Cache}; +use crate::database::entity::chunk::{self, ChunkState, Entity as Chunk}; +use crate::database::entity::chunkref::{self, Entity as ChunkRef}; use crate::database::entity::nar::{self, Entity as Nar, NarState}; use crate::database::entity::object::{self, Entity as Object}; @@ -54,6 +56,7 @@ pub async fn run_garbage_collection_once(config: Config) -> Result<()> { let state = StateInner::new(config).await; run_time_based_garbage_collection(&state).await?; run_reap_orphan_nars(&state).await?; + run_reap_orphan_chunks(&state).await?; Ok(()) } @@ -122,7 +125,6 @@ async fn run_time_based_garbage_collection(state: &State) -> Result<()> { #[instrument(skip_all)] async fn run_reap_orphan_nars(state: &State) -> Result<()> { let db = state.database().await?; - let storage = state.storage().await?; // find all orphan NARs... let orphan_nar_ids = Query::select() @@ -140,46 +142,78 @@ async fn run_reap_orphan_nars(state: &State) -> Result<()> { .lock_with_tables_behavior(LockType::Update, [Nar], LockBehavior::SkipLocked) .to_owned(); + // ... and simply delete them + let deletion = Nar::delete_many() + .filter(nar::Column::Id.in_subquery(orphan_nar_ids)) + .exec(db) + .await?; + + tracing::info!("Deleted {} orphan NARs", deletion.rows_affected,); + + Ok(()) +} + +#[instrument(skip_all)] +async fn run_reap_orphan_chunks(state: &State) -> Result<()> { + let db = state.database().await?; + let storage = state.storage().await?; + + // find all orphan chunks... + let orphan_chunk_ids = Query::select() + .from(Chunk) + .expr(chunk::Column::Id.into_expr()) + .left_join( + ChunkRef, + chunkref::Column::ChunkId + .into_expr() + .eq(chunk::Column::Id.into_expr()), + ) + .and_where(chunkref::Column::Id.is_null()) + .and_where(chunk::Column::State.eq(ChunkState::Valid)) + .and_where(chunk::Column::HoldersCount.eq(0)) + .lock_with_tables_behavior(LockType::Update, [Chunk], LockBehavior::SkipLocked) + .to_owned(); + // ... and transition their state to Deleted // - // Deleted NARs are essentially invisible from our normal queries + // Deleted chunks are essentially invisible from our normal queries let change_state = Query::update() - .table(Nar) - .value(nar::Column::State, NarState::Deleted) - .and_where(nar::Column::Id.in_subquery(orphan_nar_ids)) + .table(Chunk) + .value(chunk::Column::State, ChunkState::Deleted) + .and_where(chunk::Column::Id.in_subquery(orphan_chunk_ids)) .returning_all() .to_owned(); let stmt = db.get_database_backend().build(&change_state); - let orphan_nars = nar::Model::find_by_statement(stmt).all(db).await?; + let orphan_chunks = chunk::Model::find_by_statement(stmt).all(db).await?; - if orphan_nars.is_empty() { + if orphan_chunks.is_empty() { return Ok(()); } - // Delete the NARs from remote storage + // Delete the chunks from remote storage let delete_limit = Arc::new(Semaphore::new(20)); // TODO: Make this configurable - let futures: Vec<_> = orphan_nars + let futures: Vec<_> = orphan_chunks .into_iter() - .map(|nar| { + .map(|chunk| { let delete_limit = delete_limit.clone(); async move { let permit = delete_limit.acquire().await?; - storage.delete_file_db(&nar.remote_file.0).await?; + storage.delete_file_db(&chunk.remote_file.0).await?; drop(permit); - Result::<_, anyhow::Error>::Ok(nar.id) + Result::<_, anyhow::Error>::Ok(chunk.id) } }) .collect(); // Deletions can result in spurious failures, tolerate them // - // NARs that failed to be deleted from the remote storage will + // Chunks that failed to be deleted from the remote storage will // just be stuck in Deleted state. // // TODO: Maybe have an interactive command to retry deletions? - let deleted_nar_ids: Vec<_> = join_all(futures) + let deleted_chunk_ids: Vec<_> = join_all(futures) .await .into_iter() .filter(|r| { @@ -193,12 +227,12 @@ async fn run_reap_orphan_nars(state: &State) -> Result<()> { .collect(); // Finally, delete them from the database - let deletion = Nar::delete_many() - .filter(nar::Column::Id.is_in(deleted_nar_ids)) + let deletion = Chunk::delete_many() + .filter(chunk::Column::Id.is_in(deleted_chunk_ids)) .exec(db) .await?; - tracing::info!("Deleted {} NARs", deletion.rows_affected); + tracing::info!("Deleted {} orphan chunks", deletion.rows_affected); Ok(()) } diff --git a/server/src/lib.rs b/server/src/lib.rs index 7a22b0d..38d3b1a 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -15,9 +15,10 @@ pub mod access; mod api; +mod chunking; pub mod config; pub mod database; -mod error; +pub mod error; pub mod gc; mod middleware; mod narinfo; diff --git a/server/src/narinfo/mod.rs b/server/src/narinfo/mod.rs index dd26210..ba104f2 100644 --- a/server/src/narinfo/mod.rs +++ b/server/src/narinfo/mod.rs @@ -86,12 +86,18 @@ pub struct NarInfo { pub compression: Compression, /// The hash of the compressed file. + /// + /// We don't know the file hash if it's chunked. #[serde(rename = "FileHash")] - pub file_hash: Hash, + #[serde(skip_serializing_if = "Option::is_none")] + pub file_hash: Option, /// The size of the compressed file. + /// + /// We may not know the file size if it's chunked. #[serde(rename = "FileSize")] - pub file_size: usize, + #[serde(skip_serializing_if = "Option::is_none")] + pub file_size: Option, /// The hash of the NAR archive. /// @@ -242,6 +248,18 @@ impl IntoResponse for NarInfo { } } +impl Compression { + pub fn as_str(&self) -> &'static str { + match self { + Self::None => "none", + Self::Xz => "xz", + Self::Bzip2 => "bzip2", + Self::Brotli => "br", + Self::Zstd => "zstd", + } + } +} + impl FromStr for Compression { type Err = ServerError; @@ -262,13 +280,7 @@ impl FromStr for Compression { impl ToString for Compression { fn to_string(&self) -> String { - String::from(match self { - Self::None => "none", - Self::Xz => "xz", - Self::Bzip2 => "bzip2", - Self::Brotli => "br", - Self::Zstd => "zstd", - }) + String::from(self.as_str()) } } diff --git a/server/src/narinfo/tests.rs b/server/src/narinfo/tests.rs index de43366..f06942a 100644 --- a/server/src/narinfo/tests.rs +++ b/server/src/narinfo/tests.rs @@ -34,9 +34,9 @@ Sig: cache.nixos.org-1:lo9EfNIL4eGRuNh7DTbAAffWPpI2SlYC/8uP7JnhgmfRIUNGhSbFe8qEa assert_eq!(Compression::Xz, narinfo.compression); assert_eq!( "sha256:0nqgf15qfiacfxrgm2wkw0gwwncjqqzzalj8rs14w9srkydkjsk9", - narinfo.file_hash.to_typed_base32() + narinfo.file_hash.as_ref().unwrap().to_typed_base32() ); - assert_eq!(41104, narinfo.file_size); + assert_eq!(Some(41104), narinfo.file_size); assert_eq!( "sha256:16mvl7v0ylzcg2n3xzjn41qhzbmgcn5iyarx16nn5l2r36n2kqci", narinfo.nar_hash.to_typed_base32() diff --git a/server/src/storage/local.rs b/server/src/storage/local.rs index 383ed4f..93ff737 100644 --- a/server/src/storage/local.rs +++ b/server/src/storage/local.rs @@ -87,15 +87,19 @@ impl StorageBackend for LocalBackend { Ok(()) } - async fn download_file(&self, name: String) -> ServerResult { + async fn download_file(&self, name: String, _prefer_stream: bool) -> ServerResult { let file = File::open(self.get_path(&name)) .await .map_err(ServerError::storage_error)?; - Ok(Download::Stream(Box::new(file))) + Ok(Download::AsyncRead(Box::new(file))) } - async fn download_file_db(&self, file: &RemoteFile) -> ServerResult { + async fn download_file_db( + &self, + file: &RemoteFile, + _prefer_stream: bool, + ) -> ServerResult { let file = if let RemoteFile::Local(file) = file { file } else { @@ -109,7 +113,7 @@ impl StorageBackend for LocalBackend { .await .map_err(ServerError::storage_error)?; - Ok(Download::Stream(Box::new(file))) + Ok(Download::AsyncRead(Box::new(file))) } async fn make_db_reference(&self, name: String) -> ServerResult { diff --git a/server/src/storage/mod.rs b/server/src/storage/mod.rs index fad2b1a..37cd0c3 100644 --- a/server/src/storage/mod.rs +++ b/server/src/storage/mod.rs @@ -3,6 +3,8 @@ mod local; mod s3; +use bytes::Bytes; +use futures::stream::BoxStream; use serde::{Deserialize, Serialize}; use tokio::io::AsyncRead; @@ -33,11 +35,14 @@ pub enum RemoteFile { /// Way to download a file. pub enum Download { - /// A redirect to a (possibly ephemeral) URL. - Redirect(String), + /// A possibly ephemeral URL. + Url(String), /// A stream. - Stream(Box), + Stream(BoxStream<'static, std::io::Result>), + + /// An AsyncRead. + AsyncRead(Box), } // TODO: Maybe make RemoteFile the one true reference instead of having two sets of APIs? @@ -58,10 +63,14 @@ pub trait StorageBackend: Send + Sync + std::fmt::Debug { async fn delete_file_db(&self, file: &RemoteFile) -> ServerResult<()>; /// Downloads a file using the current configuration. - async fn download_file(&self, name: String) -> ServerResult; + async fn download_file(&self, name: String, prefer_stream: bool) -> ServerResult; /// Downloads a file using a database reference. - async fn download_file_db(&self, file: &RemoteFile) -> ServerResult; + async fn download_file_db( + &self, + file: &RemoteFile, + prefer_stream: bool, + ) -> ServerResult; /// Creates a database reference for a file. async fn make_db_reference(&self, name: String) -> ServerResult; diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 494a840..bec687b 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -1,13 +1,18 @@ //! S3 remote files. +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::time::Duration; use async_trait::async_trait; use aws_sdk_s3::{ - config::Builder as S3ConfigBuilder, model::CompletedMultipartUpload, model::CompletedPart, - presigning::config::PresigningConfig, Client, Credentials, Endpoint, Region, + client::fluent_builders::GetObject, + config::Builder as S3ConfigBuilder, + model::{CompletedMultipartUpload, CompletedPart}, + presigning::config::PresigningConfig, + Client, Credentials, Endpoint, Region, }; use futures::future::join_all; +use futures::stream::StreamExt; use serde::{Deserialize, Serialize}; use tokio::io::AsyncRead; @@ -134,6 +139,29 @@ impl S3Backend { Ok((client, file)) } + + async fn get_download(&self, req: GetObject, prefer_stream: bool) -> ServerResult { + if prefer_stream { + let output = req.send().await.map_err(ServerError::storage_error)?; + + let stream = StreamExt::map(output.body, |item| { + item.map_err(|e| IoError::new(IoErrorKind::Other, e)) + }); + + Ok(Download::Stream(Box::pin(stream))) + } else { + // FIXME: Configurable expiration + let presign_config = PresigningConfig::expires_in(Duration::from_secs(600)) + .map_err(ServerError::storage_error)?; + + let presigned = req + .presigned(presign_config) + .await + .map_err(ServerError::storage_error)?; + + Ok(Download::Url(presigned.uri().to_string())) + } + } } #[async_trait] @@ -313,38 +341,26 @@ impl StorageBackend for S3Backend { Ok(()) } - async fn download_file(&self, name: String) -> ServerResult { - // FIXME: Configurable expiration - let presign_config = PresigningConfig::expires_in(Duration::from_secs(10)) - .map_err(ServerError::storage_error)?; - - let presigned = self + async fn download_file(&self, name: String, prefer_stream: bool) -> ServerResult { + let req = self .client .get_object() .bucket(&self.config.bucket) - .key(&name) - .presigned(presign_config) - .await - .map_err(ServerError::storage_error)?; + .key(&name); - Ok(Download::Redirect(presigned.uri().to_string())) + self.get_download(req, prefer_stream).await } - async fn download_file_db(&self, file: &RemoteFile) -> ServerResult { + async fn download_file_db( + &self, + file: &RemoteFile, + prefer_stream: bool, + ) -> ServerResult { let (client, file) = self.get_client_from_db_ref(file).await?; - let presign_config = PresigningConfig::expires_in(Duration::from_secs(600)) - .map_err(ServerError::storage_error)?; + let req = client.get_object().bucket(&file.bucket).key(&file.key); - let presigned = client - .get_object() - .bucket(&file.bucket) - .key(&file.key) - .presigned(presign_config) - .await - .map_err(ServerError::storage_error)?; - - Ok(Download::Redirect(presigned.uri().to_string())) + self.get_download(req, prefer_stream).await } async fn make_db_reference(&self, name: String) -> ServerResult {