1
0
Fork 0
mirror of https://github.com/zhaofengli/attic.git synced 2025-03-05 16:27:06 +00:00

Merge remote-tracking branch 'upstream/main' into rs256-support

This commit is contained in:
Cole Helbling 2024-07-26 07:25:11 -07:00
commit 858e5bec13
24 changed files with 1236 additions and 986 deletions

View file

@ -13,7 +13,7 @@ charset = utf-8
# Rust
[*.rs]
indent_style = space
indent_size = 2
indent_size = 4
# Misc
[*.{yaml,yml,md,nix}]

View file

@ -25,7 +25,6 @@ let
value = common // {
inherit outputName;
outPath = maybeStorePath (builtins.getAttr outputName outputs);
drvPath = maybeStorePath (builtins.getAttr outputName outputs);
};
};
outputsList = map outputToAttrListElement outputNames;

View file

@ -11,7 +11,7 @@ jobs:
matrix:
os:
- ubuntu-latest
- macos-11
- macos-latest
runs-on: ${{ matrix.os }}
permissions:
contents: read

2013
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,7 +6,7 @@ publish = false
[dependencies]
async-stream = { version = "0.3.5", optional = true }
base64 = "0.21.2"
base64 = "0.22.1"
bytes = "1.4.0"
displaydoc = "0.2.4"
digest = "0.10.7"
@ -53,7 +53,7 @@ default = [ "nix_store", "tokio" ]
# Native libnixstore bindings.
#
# When disabled, the native Rust portions of nix_store can still be used.
nix_store = [ "dep:cxx", "dep:cxx-build" ]
nix_store = [ "dep:cxx", "dep:cxx-build", "tokio/rt" ]
# Tokio.
#

View file

@ -24,6 +24,9 @@ fn build_bridge() {
.flag("nix/config.h")
.flag("-idirafter")
.flag(hacky_include.path().to_str().unwrap())
// In Nix 2.19+, nix/args/root.hh depends on being able to #include "args.hh" (which is in its parent directory), for some reason
.flag("-I")
.flag(concat!(env!("NIX_INCLUDE_PATH"), "/nix"))
.compile("nixbinding");
println!("cargo:rerun-if-changed=src/nix_store/bindings");

View file

@ -31,7 +31,6 @@ let
value = common // {
inherit outputName;
outPath = maybeStorePath (builtins.getAttr outputName outputs);
drvPath = maybeStorePath (builtins.getAttr outputName outputs);
};
};
outputsList = map outputToAttrListElement outputNames;

View file

@ -12,7 +12,7 @@ path = "src/main.rs"
attic = { path = "../attic" }
anyhow = "1.0.71"
async-channel = "1.8.0"
async-channel = "2.3.1"
bytes = "1.4.0"
clap = { version = "4.3", features = ["derive"] }
clap_complete = "4.3.0"
@ -26,7 +26,7 @@ indicatif = "0.17.3"
lazy_static = "1.4.0"
notify = { version = "6.0.0", default-features = false, features = ["macos_kqueue"] }
regex = "1.8.3"
reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
reqwest = { version = "0.12.4", default-features = false, features = ["json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
serde = { version = "1.0.163", features = ["derive"] }
serde_json = "1.0.96"
toml = "0.8.8"

View file

@ -81,6 +81,7 @@ impl ServerTokenConfig {
match self {
ServerTokenConfig::Raw { token } => Ok(token.clone()),
ServerTokenConfig::File { token_file } => Ok(read_to_string(token_file)
.map(|t| t.trim().to_string())
.with_context(|| format!("Failed to read token from {token_file}"))?),
}
}

View file

@ -62,10 +62,8 @@ let
ATTIC_DISTRIBUTOR = "attic";
# Workaround for https://github.com/NixOS/nixpkgs/issues/166205
env = lib.optionalAttrs stdenv.cc.isClang {
NIX_LDFLAGS = "-l${stdenv.cc.libcxx.cxxabi.libName}";
};
# See comment in `attic/build.rs`
NIX_INCLUDE_PATH = "${lib.getDev nix}/include";
# See comment in `attic-tests`
doCheck = false;
@ -135,17 +133,15 @@ let
nativeBuildInputs = nativeBuildInputs ++ [ jq ];
# Workaround for https://github.com/NixOS/nixpkgs/issues/166205
env = lib.optionalAttrs stdenv.cc.isClang {
NIX_LDFLAGS = "-l${stdenv.cc.libcxx.cxxabi.libName}";
};
doCheck = true;
buildPhaseCargoCommand = "";
checkPhaseCargoCommand = "cargoWithProfile test --no-run --message-format=json >cargo-test.json";
doInstallCargoArtifacts = false;
# See comment in `attic/build.rs`
NIX_INCLUDE_PATH = "${lib.getDev nix}/include";
installPhase = ''
runHook preInstall

18
flake.lock generated
View file

@ -7,11 +7,11 @@
]
},
"locked": {
"lastModified": 1702918879,
"narHash": "sha256-tWJqzajIvYcaRWxn+cLUB9L9Pv4dQ3Bfit/YjU5ze3g=",
"lastModified": 1717025063,
"narHash": "sha256-dIubLa56W9sNNz0e8jGxrX3CAkPXsq7snuFA/Ie6dn8=",
"owner": "ipetkov",
"repo": "crane",
"rev": "7195c00c272fdd92fc74e7d5a0a2844b9fadb2fb",
"rev": "480dff0be03dac0e51a8dfc26e882b0d123a450e",
"type": "github"
},
"original": {
@ -53,11 +53,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1702539185,
"narHash": "sha256-KnIRG5NMdLIpEkZTnN5zovNYc0hhXjAgv6pfd5Z4c7U=",
"lastModified": 1711401922,
"narHash": "sha256-QoQqXoj8ClGo0sqD/qWKFWezgEwUL0SUh37/vY2jNhc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "aa9d4729cbc99dabacb50e3994dcefb3ea0f7447",
"rev": "07262b18b97000d16a4bdb003418bd2fb067a932",
"type": "github"
},
"original": {
@ -69,11 +69,11 @@
},
"nixpkgs-stable": {
"locked": {
"lastModified": 1702780907,
"narHash": "sha256-blbrBBXjjZt6OKTcYX1jpe9SRof2P9ZYWPzq22tzXAA=",
"lastModified": 1711460390,
"narHash": "sha256-akSgjDZL6pVHEfSE6sz1DNSXuYX6hq+P/1Z5IoYWs7E=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "1e2e384c5b7c50dbf8e9c441a9e58d85f408b01f",
"rev": "44733514b72e732bd49f5511bd0203dea9b9a434",
"type": "github"
},
"original": {

View file

@ -130,6 +130,9 @@
NIX_PATH = "nixpkgs=${pkgs.path}";
RUST_SRC_PATH = "${pkgs.rustPlatform.rustcSrc}/library";
# See comment in `attic/build.rs`
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
ATTIC_DISTRIBUTOR = "dev";
};

View file

@ -49,10 +49,8 @@ in rustPlatform.buildRustPackage rec {
ATTIC_DISTRIBUTOR = "attic";
# Workaround for https://github.com/NixOS/nixpkgs/issues/166205
env = lib.optionalAttrs stdenv.cc.isClang {
NIX_LDFLAGS = "-l${stdenv.cc.libcxx.cxxabi.libName}";
};
# See comment in `attic/build.rs`
NIX_INCLUDE_PATH = "${lib.getDev nix}/include";
# Recursive Nix is not stable yet
doCheck = false;

View file

@ -25,11 +25,11 @@ attic-token = { path = "../token" }
anyhow = "1.0.71"
async-stream = "0.3.5"
async-trait = "0.1.68"
aws-config = "0.57.1"
aws-sdk-s3 = "0.35.0"
axum = "0.6.18"
axum-macros = "0.3.7"
base64 = "0.21.2"
aws-config = "1.5.0"
aws-sdk-s3 = "1.32.0"
axum = "0.7.5"
axum-macros = "0.4.1"
base64 = "0.22.1"
bytes = "1.4.0"
chrono = "0.4.24"
clap = { version = "4.3", features = ["derive"] }
@ -40,6 +40,7 @@ enum-as-inner = "0.6.0"
fastcdc = "3.0.3"
futures = "0.3.28"
hex = "0.4.3"
http-body-util = "0.1.1"
humantime = "2.1.0"
humantime-serde = "1.1.1"
itoa = "=1.0.5"
@ -53,7 +54,7 @@ serde_json = "1.0.96"
serde_with = "3.0.0"
tokio-util = { version = "0.7.8", features = [ "io" ] }
toml = "0.8.8"
tower-http = { version = "0.4.0", features = [ "catch-panic", "trace" ] }
tower-http = { version = "0.5.2", features = [ "catch-panic", "trace" ] }
tracing = "0.1.37"
tracing-error = "0.2.0"
tracing-subscriber = { version = "0.3.17", features = [ "json" ] }

View file

@ -2,7 +2,7 @@
use attic::cache::CacheName;
use attic_token::util::parse_authorization_header;
use axum::{http::Request, middleware::Next, response::Response};
use axum::{extract::Request, middleware::Next, response::Response};
use sea_orm::DatabaseConnection;
use tokio::sync::OnceCell;
@ -93,7 +93,7 @@ impl AuthState {
}
/// Performs auth.
pub async fn apply_auth<B>(req: Request<B>, next: Next<B>) -> Response {
pub async fn apply_auth(req: Request, next: Next) -> Response {
let token: Option<Token> = req
.headers()
.get("Authorization")

View file

@ -10,7 +10,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use axum::{
body::StreamBody,
body::Body,
extract::{Extension, Path},
http::StatusCode,
response::{IntoResponse, Redirect, Response},
@ -18,6 +18,7 @@ use axum::{
Router,
};
use futures::stream::BoxStream;
use http_body_util::BodyExt;
use serde::Serialize;
use tokio_util::io::ReaderStream;
use tracing::instrument;
@ -217,7 +218,11 @@ async fn get_nar(
Download::Url(url) => Ok(Redirect::temporary(&url).into_response()),
Download::AsyncRead(stream) => {
let stream = ReaderStream::new(stream);
let body = StreamBody::new(stream);
let body = Body::from_stream(stream).map_err(|e| {
tracing::error!("Stream error: {e}");
e
}).into_inner();
Ok(body.into_response())
}
}
@ -250,7 +255,11 @@ async fn get_nar(
// TODO: Make num_prefetch configurable
// The ideal size depends on the average chunk size
let merged = merge_chunks(chunks, streamer, storage, 2);
let body = StreamBody::new(merged);
let body = Body::from_stream(merged).map_err(|e| {
tracing::error!("Stream error: {e}");
e
}).into_inner();
Ok(body.into_response())
}
}

View file

@ -8,7 +8,8 @@ use anyhow::anyhow;
use async_compression::tokio::bufread::{BrotliEncoder, XzEncoder, ZstdEncoder};
use async_compression::Level as CompressionLevel;
use axum::{
extract::{BodyStream, Extension, Json},
body::Body,
extract::{Extension, Json},
http::HeaderMap,
};
use bytes::{Bytes, BytesMut};
@ -120,8 +121,9 @@ pub(crate) async fn upload_path(
Extension(state): Extension<State>,
Extension(req_state): Extension<RequestState>,
headers: HeaderMap,
stream: BodyStream,
body: Body,
) -> ServerResult<Json<UploadPathResult>> {
let stream = body.into_data_stream();
let mut stream = StreamReader::new(
stream.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))),
);

View file

@ -158,6 +158,15 @@ async fn run_reap_orphan_chunks(state: &State) -> Result<()> {
let db = state.database().await?;
let storage = state.storage().await?;
let orphan_chunk_limit = match db.get_database_backend() {
// Arbitrarily chosen sensible value since there's no good default to choose from for MySQL
sea_orm::DatabaseBackend::MySql => 1000,
// Panic limit set by sqlx for postgresql: https://github.com/launchbadge/sqlx/issues/671#issuecomment-687043510
sea_orm::DatabaseBackend::Postgres => u64::from(u16::MAX),
// Default statement limit imposed by sqlite: https://www.sqlite.org/limits.html#max_variable_number
sea_orm::DatabaseBackend::Sqlite => 500,
};
// find all orphan chunks...
let orphan_chunk_ids = Query::select()
.from(Chunk)
@ -190,6 +199,7 @@ async fn run_reap_orphan_chunks(state: &State) -> Result<()> {
let orphan_chunks: Vec<chunk::Model> = Chunk::find()
.filter(chunk::Column::State.eq(ChunkState::Deleted))
.limit(orphan_chunk_limit)
.all(db)
.await?;

View file

@ -26,6 +26,7 @@ pub mod nix_manifest;
pub mod oobe;
mod storage;
use std::future::IntoFuture;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@ -38,6 +39,7 @@ use axum::{
Router,
};
use sea_orm::{query::Statement, ConnectionTrait, Database, DatabaseConnection};
use tokio::net::TcpListener;
use tokio::sync::OnceCell;
use tokio::time;
use tower_http::catch_panic::CatchPanicLayer;
@ -105,9 +107,28 @@ impl StateInner {
async fn database(&self) -> ServerResult<&DatabaseConnection> {
self.database
.get_or_try_init(|| async {
Database::connect(&self.config.database.url)
let db = Database::connect(&self.config.database.url)
.await
.map_err(ServerError::database_error)
.map_err(ServerError::database_error);
if let Ok(DatabaseConnection::SqlxSqlitePoolConnection(ref conn)) = db {
// execute some sqlite-specific performance optimizations
// see https://phiresky.github.io/blog/2020/sqlite-performance-tuning/ for
// more details
// intentionally ignore errors from this: this is purely for performance,
// not for correctness, so we can live without this
_ = conn
.execute_unprepared(
"
pragma journal_mode=WAL;
pragma synchronous=normal;
pragma temp_store=memory;
pragma mmap_size = 30000000000;
",
)
.await;
}
db
})
.await
}
@ -221,14 +242,13 @@ pub async fn run_api_server(cli_listen: Option<SocketAddr>, config: Config) -> R
eprintln!("Listening on {:?}...", listen);
let (server_ret, _) = tokio::join!(
axum::Server::bind(&listen).serve(rest.into_make_service()),
async {
let listener = TcpListener::bind(&listen).await?;
let (server_ret, _) = tokio::join!(axum::serve(listener, rest).into_future(), async {
if state.config.database.heartbeat {
let _ = state.run_db_heartbeat().await;
}
},
);
},);
server_ret?;

View file

@ -3,8 +3,8 @@ use std::sync::Arc;
use anyhow::anyhow;
use axum::{
extract::{Extension, Host},
http::{HeaderValue, Request},
extract::{Extension, Host, Request},
http::HeaderValue,
middleware::Next,
response::Response,
};
@ -14,11 +14,11 @@ use crate::error::{ErrorKind, ServerResult};
use attic::api::binary_cache::ATTIC_CACHE_VISIBILITY;
/// Initializes per-request state.
pub async fn init_request_state<B>(
pub async fn init_request_state(
Extension(state): Extension<State>,
Host(host): Host,
mut req: Request<B>,
next: Next<B>,
mut req: Request,
next: Next,
) -> Response {
// X-Forwarded-Proto is an untrusted header
let client_claims_https =
@ -45,11 +45,11 @@ pub async fn init_request_state<B>(
///
/// We also require that all request have a Host header in
/// the first place.
pub async fn restrict_host<B>(
pub async fn restrict_host(
Extension(state): Extension<State>,
Host(host): Host,
req: Request<B>,
next: Next<B>,
req: Request,
next: Next,
) -> ServerResult<Response> {
let allowed_hosts = &state.config.allowed_hosts;
@ -61,10 +61,10 @@ pub async fn restrict_host<B>(
}
/// Sets the `X-Attic-Cache-Visibility` header in responses.
pub(crate) async fn set_visibility_header<B>(
pub(crate) async fn set_visibility_header(
Extension(req_state): Extension<RequestState>,
req: Request<B>,
next: Next<B>,
req: Request,
next: Next,
) -> ServerResult<Response> {
let mut response = next.run(req).await;

View file

@ -3,6 +3,7 @@
use std::time::Duration;
use async_trait::async_trait;
use aws_config::BehaviorVersion;
use aws_sdk_s3::{
config::Builder as S3ConfigBuilder,
config::{Credentials, Region},
@ -91,7 +92,7 @@ impl S3Backend {
}
async fn config_builder(config: &S3StorageConfig) -> ServerResult<S3ConfigBuilder> {
let shared_config = aws_config::load_from_env().await;
let shared_config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
let mut builder = S3ConfigBuilder::from(&shared_config);
if let Some(credentials) = &config.credentials {

View file

@ -8,9 +8,10 @@ edition = "2021"
[dependencies]
attic = { path = "../attic", default-features = false }
base64 = "0.21.2"
base64 = "0.22.1"
chrono = "0.4.31"
displaydoc = "0.2.4"
indexmap = { version = "2.2.6", features = ["serde"] }
jwt-simple = "0.11.5"
lazy_static = "1.4.0"
regex = "1.8.3"

View file

@ -83,12 +83,13 @@ pub mod util;
#[cfg(test)]
mod tests;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::error::Error as StdError;
use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, Engine};
use chrono::{DateTime, Utc};
use displaydoc::Display;
use indexmap::IndexMap;
use jwt_simple::prelude::{Duration, RSAKeyPairLike, RSAPublicKeyLike, VerificationOptions};
pub use jwt_simple::{
algorithms::{HS256Key, MACLike, RS256KeyPair, RS256PublicKey},
@ -147,7 +148,7 @@ pub struct AtticAccess {
/// Cache permissions.
///
/// Keys here may include wildcards.
caches: HashMap<CacheNamePattern, CachePermission>,
caches: IndexMap<CacheNamePattern, CachePermission>,
}
/// Permission to a single cache.
@ -354,7 +355,7 @@ impl Token {
&mut self,
pattern: CacheNamePattern,
) -> &mut CachePermission {
use std::collections::hash_map::Entry;
use indexmap::map::Entry;
let access = self.attic_access_mut();
match access.caches.entry(pattern) {

View file

@ -18,6 +18,8 @@ fn test_basic() {
"nbf": 0,
"https://jwt.attic.rs/v1": {
"caches": {
"all-*": {"r":1},
"all-ci-*": {"w":1},
"cache-rw": {"r":1,"w":1},
"cache-ro": {"r":1},
"team-*": {"r":1,"w":1,"cc":1}
@ -33,7 +35,30 @@ fn test_basic() {
let dec_key = decode_token_rs256_secret_base64(base64_secret).unwrap();
// TOKEN=$(jq -c < json | jwt encode --alg RS256 --secret @./rs256 -)
let token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJleHAiOjQxMDIzMjQ5ODYsImh0dHBzOi8vand0LmF0dGljLnJzL3YxIjp7ImNhY2hlcyI6eyJjYWNoZS1ybyI6eyJyIjoxfSwiY2FjaGUtcnciOnsiciI6MSwidyI6MX0sInRlYW0tKiI6eyJjYyI6MSwiciI6MSwidyI6MX19fSwiaWF0IjoxNjk5NzM0NTU3LCJuYmYiOjAsInN1YiI6Im1lb3cifQ.k1TCqAg5_yaBQByKnYn5zSvMsYi8XrHe1h8T2hijZiP1SsYYnKphKKm0e61lmr3tSM-3dtRRCNGB7elhetpuz2jz8fWyBmpjO-yIX2uB787iRKVjaVCEKSPjcKO9lGp9LlxKdNH0SLRmdwkJGQUHbzN6QurfiV4C54cPxC_43EamkOqFUFmmwohi_r76RZtMb8uyt-9t7Canpm7GfJg4uVg3MLgbvCKxJ4BSu4UgXPz-MYupHS_pIEtlCY8FjlVrXlBLAleUvcBPY2qML9gxpqBrh9s1qfLpCeTZkG-vDjb_Y8X0gXa0OshFrvnoIyHwDc9jmj1X35T0YslyjbQXWQ";
let token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjQxMDIzMjQ5ODYsImh0dHBzOi8vand0LmF0dGljLnJzL3YxIjp7ImNhY2hlcyI6eyJhbGwtKiI6eyJyIjoxfSwiYWxsLWNpLSoiOnsidyI6MX0sImNhY2hlLXJvIjp7InIiOjF9LCJjYWNoZS1ydyI6eyJyIjoxLCJ3IjoxfSwidGVhbS0qIjp7ImNjIjoxLCJyIjoxLCJ3IjoxfX19LCJpYXQiOjE3MTY2NjA1ODksInN1YiI6Im1lb3cifQ.8vtxp_1OEYdcnkGPM4c9ORXooJZV7DOTS4NRkMKN8mw";
// NOTE(cole-h): check that we get a consistent iteration order when getting permissions for
// caches -- this depends on the order of the fields in the token, but should otherwise be
// consistent between iterations
let mut was_ever_wrong = false;
for _ in 0..=1_000 {
// NOTE(cole-h): we construct a new Token every iteration in order to get different "random
// state"
let decoded =
Token::from_jwt(token, &SignatureType::RS256(dec_key.clone()), &None, &None).unwrap();
let perm_all_ci = decoded.get_permission_for_cache(&cache! { "all-ci-abc" });
// NOTE(cole-h): if the iteration order of the token is inconsistent, the permissions may be
// retrieved from the `all-ci-*` pattern (which only allows writing/pushing), even though
// the `all-*` pattern (which only allows reading/pulling) is specified first
if perm_all_ci.require_pull().is_err() || perm_all_ci.require_push().is_ok() {
was_ever_wrong = true;
}
}
assert!(
!was_ever_wrong,
"Iteration order should be consistent to prevent random auth failures (and successes)"
);
let decoded = Token::from_jwt(token, &SignatureType::RS256(dec_key), &None, &None).unwrap();