1
0
Fork 0
mirror of https://github.com/zhaofengli/attic.git synced 2024-12-14 11:57:30 +00:00

server: Attach tracing context to errors

This commit is contained in:
Zhaofeng Li 2023-01-04 21:05:07 -07:00
parent 27836028f4
commit c04aff7c48
17 changed files with 200 additions and 108 deletions

24
Cargo.lock generated
View file

@ -233,6 +233,7 @@ dependencies = [
"toml",
"tower-http",
"tracing",
"tracing-error",
"tracing-subscriber",
"uuid",
"xdg",
@ -3609,6 +3610,16 @@ dependencies = [
"valuable",
]
[[package]]
name = "tracing-error"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e"
dependencies = [
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
@ -3630,6 +3641,16 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.16"
@ -3640,12 +3661,15 @@ dependencies = [
"nu-ansi-term",
"once_cell",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]

View file

@ -5,7 +5,3 @@ members = [
"client",
"server",
]
[profile.dev]
opt-level = 2
incremental = true

View file

@ -73,12 +73,15 @@
rustfmt clippy
cargo-expand cargo-outdated cargo-edit
tokio-console
sqlite-interactive
editorconfig-checker
flyctl
wrk
] ++ (lib.optionals pkgs.stdenv.isLinux [
linuxPackages.perf
]);

View file

@ -52,11 +52,12 @@ serde_json = "1.0.91"
serde_with = "2.1.0"
tokio-util = { version = "0.7.4", features = [ "io" ] }
toml = "0.5.10"
tower-http = { version = "0.3.5", features = [ "catch-panic" ] }
tower-http = { version = "0.3.5", features = [ "catch-panic", "trace" ] }
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
tracing-error = "0.2.0"
tracing-subscriber = { version = "0.3.16", features = [ "json" ] }
uuid = { version = "1.2.2", features = ["v4"] }
console-subscriber = { version = "0.1.8", optional = true }
console-subscriber = "0.1.8"
xdg = "2.4.1"
[dependencies.async-compression]
@ -92,6 +93,3 @@ features = [
"rt-multi-thread",
"sync",
]
[features]
tokio-console = ["dep:console-subscriber"]

View file

@ -65,23 +65,17 @@ impl AuthState {
d
}
Err(e) => {
if permission.can_discover() {
return Err(e);
} else {
return Err(e.into_no_discovery_permissions());
}
Err(mut e) => {
e.set_discovery_permission(permission.can_discover());
return Err(e);
}
};
match f(cache, &mut permission) {
Ok(t) => Ok(t),
Err(e) => {
if permission.can_discover() {
Err(e)
} else {
Err(e.into_no_discovery_permissions())
}
Err(mut e) => {
e.set_discovery_permission(permission.can_discover());
Err(e)
}
}
}

View file

@ -19,7 +19,7 @@ use tokio_util::io::ReaderStream;
use tracing::instrument;
use crate::database::AtticDatabase;
use crate::error::{ServerError, ServerResult};
use crate::error::{ErrorKind, ServerResult};
use crate::narinfo::NarInfo;
use crate::nix_manifest;
use crate::storage::Download;
@ -101,6 +101,7 @@ async fn get_nix_cache_info(
/// - HEAD `/:cache/{storePathHash}.narinfo`
/// - GET `/:cache/{storePathHash}.ls` (not implemented)
#[instrument(skip_all, fields(cache_name, path))]
#[axum_macros::debug_handler]
async fn get_store_path_info(
Extension(state): Extension<State>,
Extension(req_state): Extension<RequestState>,
@ -109,12 +110,12 @@ async fn get_store_path_info(
let components: Vec<&str> = path.splitn(2, '.').collect();
if components.len() != 2 {
return Err(ServerError::NotFound);
return Err(ErrorKind::NotFound.into());
}
// TODO: Other endpoints
if components[1] != "narinfo" {
return Err(ServerError::NotFound);
return Err(ErrorKind::NotFound.into());
}
let store_path_hash = StorePathHash::new(components[0].to_string())?;
@ -162,11 +163,11 @@ async fn get_nar(
let components: Vec<&str> = path.splitn(2, '.').collect();
if components.len() != 2 {
return Err(ServerError::NotFound);
return Err(ErrorKind::NotFound.into());
}
if components[1] != "nar" {
return Err(ServerError::NotFound);
return Err(ErrorKind::NotFound.into());
}
let store_path_hash = StorePathHash::new(components[0].to_string())?;

View file

@ -10,7 +10,7 @@ use tracing::instrument;
use crate::database::entity::cache::{self, Entity as Cache};
use crate::database::entity::Json as DbJson;
use crate::error::{ServerError, ServerResult};
use crate::error::{ErrorKind, ServerError, ServerResult};
use crate::{RequestState, State};
use attic::api::v1::cache_config::{
CacheConfig, CreateCacheRequest, KeypairConfig, RetentionPeriodConfig,
@ -115,7 +115,7 @@ pub(crate) async fn configure_cache(
}
RetentionPeriodConfig::Period(period) => {
update.retention_period = Set(Some(period.try_into().map_err(|_| {
ServerError::RequestError(anyhow!("Invalid retention period"))
ErrorKind::RequestError(anyhow!("Invalid retention period"))
})?));
}
}
@ -131,9 +131,9 @@ pub(crate) async fn configure_cache(
Ok(())
} else {
Err(ServerError::RequestError(anyhow!(
Err(ErrorKind::RequestError(anyhow!(
"No modifiable fields were set."
)))
)).into())
}
}
@ -164,7 +164,7 @@ pub(crate) async fn destroy_cache(
if deletion.rows_affected == 0 {
// Someone raced to (soft) delete the cache before us
Err(ServerError::NoSuchCache)
Err(ErrorKind::NoSuchCache.into())
} else {
Ok(())
}
@ -179,7 +179,7 @@ pub(crate) async fn destroy_cache(
if deletion.rows_affected == 0 {
// Someone raced to (soft) delete the cache before us
Err(ServerError::NoSuchCache)
Err(ErrorKind::NoSuchCache.into())
} else {
Ok(())
}
@ -224,7 +224,7 @@ pub(crate) async fn create_cache(
if num_inserted == 0 {
// The cache already exists
Err(ServerError::CacheAlreadyExists)
Err(ErrorKind::CacheAlreadyExists.into())
} else {
Ok(())
}

View file

@ -23,7 +23,7 @@ use tracing::instrument;
use uuid::Uuid;
use crate::config::CompressionType;
use crate::error::{ServerError, ServerResult};
use crate::error::{ErrorKind, ServerError, ServerResult};
use crate::narinfo::Compression;
use crate::{RequestState, State};
use attic::api::v1::upload_path::UploadPathNarInfo;
@ -88,7 +88,7 @@ pub(crate) async fn upload_path(
let upload_info: UploadPathNarInfo = {
let header = headers
.get("X-Attic-Nar-Info")
.ok_or_else(|| ServerError::RequestError(anyhow!("X-Attic-Nar-Info must be set")))?;
.ok_or_else(|| ErrorKind::RequestError(anyhow!("X-Attic-Nar-Info must be set")))?;
serde_json::from_slice(header.as_bytes()).map_err(ServerError::request_error)?
};
@ -147,7 +147,7 @@ async fn upload_path_dedup(
|| *nar_size != upload_info.nar_size
|| *nar_size != existing_nar.nar_size as usize
{
return Err(ServerError::RequestError(anyhow!("Bad NAR Hash or Size")));
return Err(ErrorKind::RequestError(anyhow!("Bad NAR Hash or Size")).into());
}
// Finally...
@ -280,7 +280,7 @@ async fn upload_path_new(
let file_hash = Hash::Sha256(file_hash.as_slice().try_into().unwrap());
if nar_hash != upload_info.nar_hash || *nar_size != upload_info.nar_size {
return Err(ServerError::RequestError(anyhow!("Bad NAR Hash or Size")));
return Err(ErrorKind::RequestError(anyhow!("Bad NAR Hash or Size")).into());
}
// Finally...

View file

@ -12,7 +12,7 @@ use sea_orm::sea_query::{Expr, LockBehavior, LockType, Query, Value};
use sea_orm::{ActiveValue::Set, ConnectionTrait, DatabaseConnection, FromQueryResult};
use tokio::task;
use crate::error::{ServerError, ServerResult};
use crate::error::{ErrorKind, ServerError, ServerResult};
use attic::cache::CacheName;
use attic::hash::Hash;
use attic::nix_store::StorePathHash;
@ -88,7 +88,7 @@ impl AtticDatabase for DatabaseConnection {
.query_one(stmt)
.await
.map_err(ServerError::database_error)?
.ok_or(ServerError::NoSuchObject)?;
.ok_or(ErrorKind::NoSuchObject)?;
let object = object::Model::from_query_result(&result, SELECT_OBJECT)
.map_err(ServerError::database_error)?;
@ -107,7 +107,7 @@ impl AtticDatabase for DatabaseConnection {
.one(self)
.await
.map_err(ServerError::database_error)?
.ok_or(ServerError::NoSuchCache)
.ok_or_else(|| ErrorKind::NoSuchCache.into())
}
async fn find_and_lock_nar(&self, nar_hash: &Hash) -> ServerResult<Option<NarGuard>> {

View file

@ -1,6 +1,7 @@
//! Error handling.
use std::error::Error as StdError;
use std::fmt;
use anyhow::Error as AnyError;
use axum::http::StatusCode;
@ -8,14 +9,28 @@ use axum::response::{IntoResponse, Response};
use axum::Json;
use displaydoc::Display;
use serde::Serialize;
use tracing_error::SpanTrace;
use attic::error::AtticError;
pub type ServerResult<T> = Result<T, ServerError>;
/// An error.
/// A server error.
#[derive(Debug)]
pub struct ServerError {
/// The kind of the error.
kind: ErrorKind,
/// Whether the client that caused the error has discovery permissions.
discovery_permission: bool,
/// Context of where the error occurred.
context: SpanTrace,
}
/// The kind of an error.
#[derive(Debug, Display)]
pub enum ServerError {
pub enum ErrorKind {
// Generic responses
/// The URL you requested was not found.
NotFound,
@ -67,17 +82,82 @@ pub struct ErrorResponse {
impl ServerError {
pub fn database_error(error: impl StdError + Send + Sync + 'static) -> Self {
Self::DatabaseError(AnyError::new(error))
ErrorKind::DatabaseError(AnyError::new(error)).into()
}
pub fn storage_error(error: impl StdError + Send + Sync + 'static) -> Self {
Self::StorageError(AnyError::new(error))
ErrorKind::StorageError(AnyError::new(error)).into()
}
pub fn request_error(error: impl StdError + Send + Sync + 'static) -> Self {
Self::RequestError(AnyError::new(error))
ErrorKind::RequestError(AnyError::new(error)).into()
}
pub fn set_discovery_permission(&mut self, perm: bool) {
self.discovery_permission = perm;
}
}
impl fmt::Display for ServerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "{}", self.kind)?;
self.context.fmt(f)?;
Ok(())
}
}
impl From<ErrorKind> for ServerError {
fn from(kind: ErrorKind) -> Self {
Self {
kind,
discovery_permission: true,
context: SpanTrace::capture(),
}
}
}
impl From<AtticError> for ServerError {
fn from(error: AtticError) -> Self {
ErrorKind::AtticError(error).into()
}
}
impl From<super::access::Error> for ServerError {
fn from(error: super::access::Error) -> Self {
ErrorKind::AccessError(error).into()
}
}
impl StdError for ServerError {}
impl IntoResponse for ServerError {
fn into_response(self) -> Response {
// TODO: Better logging control
if matches!(self.kind, ErrorKind::DatabaseError(_) | ErrorKind::StorageError(_) | ErrorKind::ManifestSerializationError(_) | ErrorKind::AtticError(_)) {
tracing::error!("{}", self);
}
let kind = if self.discovery_permission {
self.kind
} else {
self.kind.into_no_discovery_permissions()
};
// TODO: don't sanitize in dev mode
let sanitized = kind.into_clients();
let status_code = sanitized.http_status_code();
let error_response = ErrorResponse {
code: status_code.as_u16(),
message: sanitized.to_string(),
error: sanitized.name().to_string(),
};
(status_code, Json(error_response)).into_response()
}
}
impl ErrorKind {
fn name(&self) -> &'static str {
match self {
Self::NotFound => "NotFound",
@ -99,7 +179,7 @@ impl ServerError {
/// Returns a more restricted version of this error for a client without discovery
/// permissions.
pub fn into_no_discovery_permissions(self) -> Self {
fn into_no_discovery_permissions(self) -> Self {
match self {
Self::NoSuchCache => Self::Unauthorized,
Self::NoSuchObject => Self::Unauthorized,
@ -138,39 +218,4 @@ impl ServerError {
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
impl StdError for ServerError {}
impl From<AtticError> for ServerError {
fn from(error: AtticError) -> Self {
Self::AtticError(error)
}
}
impl From<super::access::Error> for ServerError {
fn from(error: super::access::Error) -> Self {
Self::AccessError(error)
}
}
impl IntoResponse for ServerError {
fn into_response(self) -> Response {
// TODO: Better logging control
if matches!(self, Self::DatabaseError(_) | Self::StorageError(_) | Self::ManifestSerializationError(_) | Self::AtticError(_)) {
tracing::error!("{:?}", self);
}
// TODO: don't sanitize in dev mode
let sanitized = self.into_clients();
let status_code = sanitized.http_status_code();
let error_response = ErrorResponse {
code: status_code.as_u16(),
message: sanitized.to_string(),
error: sanitized.name().to_string(),
};
(status_code, Json(error_response)).into_response()
}
}
}

View file

@ -39,12 +39,13 @@ use sea_orm::{query::Statement, ConnectionTrait, Database, DatabaseConnection};
use tokio::sync::OnceCell;
use tokio::time;
use tower_http::catch_panic::CatchPanicLayer;
use tower_http::trace::TraceLayer;
use access::http::{apply_auth, AuthState};
use attic::cache::CacheName;
use config::{Config, StorageConfig};
use database::migration::{Migrator, MigratorTrait};
use error::{ServerError, ServerResult};
use error::{ErrorKind, ServerError, ServerResult};
use middleware::{init_request_state, restrict_host};
use storage::{LocalBackend, S3Backend, StorageBackend};
@ -171,7 +172,7 @@ impl RequestStateInner {
/// The fallback route.
#[axum_macros::debug_handler]
async fn fallback(_: Uri) -> ServerResult<()> {
Err(ServerError::NotFound)
Err(ErrorKind::NotFound.into())
}
/// Runs the API server.
@ -194,6 +195,7 @@ pub async fn run_api_server(cli_listen: Option<SocketAddr>, config: Config) -> R
.layer(axum::middleware::from_fn(init_request_state))
.layer(axum::middleware::from_fn(restrict_host))
.layer(Extension(state.clone()))
.layer(TraceLayer::new_for_http())
.layer(CatchPanicLayer::new());
eprintln!("Listening on {:?}...", listen);

View file

@ -5,6 +5,10 @@ use std::path::PathBuf;
use anyhow::Result;
use clap::{Parser, ValueEnum};
use tokio::join;
use tokio::task::spawn;
use tracing_error::ErrorLayer;
use tracing_subscriber::prelude::*;
use tracing_subscriber::EnvFilter;
use attic_server::config;
@ -26,6 +30,12 @@ struct Opts {
/// Mode to run.
#[clap(long, default_value = "monolithic")]
mode: ServerMode,
/// Whether to enable tokio-console.
///
/// The console server will listen on its default port.
#[clap(long)]
tokio_console: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
@ -51,10 +61,11 @@ enum ServerMode {
#[tokio::main]
async fn main() -> Result<()> {
init_logging();
let opts = Opts::parse();
init_logging(opts.tokio_console);
dump_version();
let opts = Opts::parse();
let config = if let Some(config_path) = opts.config {
config::load_config_from_path(&config_path)
} else if let Ok(config_env) = env::var("ATTIC_SERVER_CONFIG_BASE64") {
@ -106,12 +117,30 @@ async fn main() -> Result<()> {
Ok(())
}
fn init_logging() {
#[cfg(not(feature = "tokio-console"))]
tracing_subscriber::fmt::init();
fn init_logging(tokio_console: bool) {
let env_filter = EnvFilter::from_default_env();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_filter(env_filter);
#[cfg(feature = "tokio-console")]
console_subscriber::init();
let error_layer = ErrorLayer::default();
let console_layer = if tokio_console {
let (layer, server) = console_subscriber::ConsoleLayer::new();
spawn(server.serve());
Some(layer)
} else {
None
};
tracing_subscriber::registry()
.with(fmt_layer)
.with(error_layer)
.with(console_layer)
.init();
if tokio_console {
eprintln!("Note: tokio-console is enabled");
}
}
fn dump_version() {

View file

@ -9,7 +9,7 @@ use axum::{
};
use super::{AuthState, RequestStateInner, State};
use crate::error::{ServerError, ServerResult};
use crate::error::{ErrorKind, ServerResult};
/// Initializes per-request state.
pub async fn init_request_state<B>(
@ -50,7 +50,7 @@ pub async fn restrict_host<B>(
let allowed_hosts = &state.config.allowed_hosts;
if !allowed_hosts.is_empty() && !allowed_hosts.iter().any(|h| h.as_str() == host) {
return Err(ServerError::RequestError(anyhow!("Bad Host")));
return Err(ErrorKind::RequestError(anyhow!("Bad Host")).into());
}
Ok(next.run(req).await)

View file

@ -48,7 +48,7 @@ use serde::de;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use crate::error::{ServerError, ServerResult};
use crate::error::{ErrorKind, ServerError, ServerResult};
use crate::nix_manifest::{self, SpaceDelimitedList};
use attic::hash::Hash;
use attic::mime;
@ -252,9 +252,9 @@ impl FromStr for Compression {
"bzip2" => Ok(Self::Bzip2),
"br" => Ok(Self::Brotli),
"zstd" => Ok(Self::Zstd),
_ => Err(ServerError::InvalidCompressionType {
_ => Err(ErrorKind::InvalidCompressionType {
name: s.to_string(),
}),
}.into()),
}
}
}

View file

@ -31,7 +31,7 @@ use displaydoc::Display;
use serde::{de, ser, Deserialize, Serialize};
use serde_with::{formats::SpaceSeparator, StringWithSeparator};
use crate::error::{ServerError, ServerResult};
use crate::error::{ErrorKind, ServerResult};
use deserializer::Deserializer;
use serializer::Serializer;
@ -42,7 +42,7 @@ where
T: for<'de> Deserialize<'de>,
{
let mut deserializer = Deserializer::from_str(s);
T::deserialize(&mut deserializer).map_err(ServerError::ManifestSerializationError)
T::deserialize(&mut deserializer).map_err(|e| ErrorKind::ManifestSerializationError(e).into())
// FIXME: Reject extra output??
}
@ -54,7 +54,7 @@ where
let mut serializer = Serializer::new();
value
.serialize(&mut serializer)
.map_err(ServerError::ManifestSerializationError)?;
.map_err(ErrorKind::ManifestSerializationError)?;
Ok(serializer.into_output())
}

View file

@ -8,7 +8,7 @@ use tokio::fs::{self, File};
use tokio::io::{self, AsyncRead};
use super::{Download, RemoteFile, StorageBackend};
use crate::error::{ServerError, ServerResult};
use crate::error::{ErrorKind, ServerError, ServerResult};
#[derive(Debug)]
pub struct LocalBackend {
@ -74,9 +74,9 @@ impl StorageBackend for LocalBackend {
let file = if let RemoteFile::Local(file) = file {
file
} else {
return Err(ServerError::StorageError(anyhow::anyhow!(
return Err(ErrorKind::StorageError(anyhow::anyhow!(
"Does not understand the remote file reference"
)));
)).into());
};
fs::remove_file(self.get_path(&file.name))
@ -98,9 +98,9 @@ impl StorageBackend for LocalBackend {
let file = if let RemoteFile::Local(file) = file {
file
} else {
return Err(ServerError::StorageError(anyhow::anyhow!(
return Err(ErrorKind::StorageError(anyhow::anyhow!(
"Does not understand the remote file reference"
)));
)).into());
};
let file = File::open(self.get_path(&file.name))

View file

@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt};
use super::{Download, RemoteFile, StorageBackend};
use crate::error::{ServerError, ServerResult};
use crate::error::{ErrorKind, ServerError, ServerResult};
use attic::util::Finally;
/// The chunk size for each part in a multipart upload.
@ -113,9 +113,9 @@ impl S3Backend {
let file = if let RemoteFile::S3(file) = file {
file
} else {
return Err(ServerError::StorageError(anyhow::anyhow!(
return Err(ErrorKind::StorageError(anyhow::anyhow!(
"Does not understand the remote file reference"
)));
)).into());
};
// FIXME: Ugly