From 93a38d1b1e96bc259246b3ea0005193da969b497 Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Sat, 14 Jan 2023 23:55:10 -0700 Subject: [PATCH] Move read_chunk_async to attic --- attic/src/stream.rs | 30 +++++++++++++++++++++++++++++- server/src/storage/s3.rs | 33 ++++++++------------------------- 2 files changed, 37 insertions(+), 26 deletions(-) diff --git a/attic/src/stream.rs b/attic/src/stream.rs index f37bb62..8d71c43 100644 --- a/attic/src/stream.rs +++ b/attic/src/stream.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use digest::{Digest, Output as DigestOutput}; -use tokio::io::{AsyncRead, ReadBuf}; +use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio::sync::OnceCell; /// Stream filter that hashes the bytes that have been read. @@ -73,6 +73,34 @@ impl AsyncRead for StreamHasher { } } +/// Greedily reads from a stream for some number of bytes. +/// +/// This was originally from rust-s3 but completely rewritten to resolve +/// performance problems. +pub async fn read_chunk_async( + stream: &mut S, + max_size: usize, +) -> std::io::Result> { + let mut chunk: Box<[u8]> = vec![0u8; max_size].into_boxed_slice(); + let mut cursor = 0; + + while cursor < max_size { + let buf = &mut chunk[cursor..]; + let read = stream.read(buf).await?; + + if read == 0 { + break; + } else { + cursor += read; + } + } + + let mut vec = chunk.into_vec(); + vec.truncate(cursor); + + Ok(vec) +} + #[cfg(test)] mod tests { use super::*; diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index a0963b3..494a840 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -9,10 +9,11 @@ use aws_sdk_s3::{ }; use futures::future::join_all; use serde::{Deserialize, Serialize}; -use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::io::AsyncRead; use super::{Download, RemoteFile, StorageBackend}; use crate::error::{ErrorKind, ServerError, ServerResult}; +use attic::stream::read_chunk_async; use attic::util::Finally; /// The chunk size for each part in a multipart upload. @@ -142,7 +143,9 @@ impl StorageBackend for S3Backend { name: String, mut stream: &mut (dyn AsyncRead + Unpin + Send), ) -> ServerResult { - let first_chunk = read_chunk_async(&mut stream).await?; + let first_chunk = read_chunk_async(&mut stream, CHUNK_SIZE) + .await + .map_err(ServerError::storage_error)?; if first_chunk.len() < CHUNK_SIZE { // do a normal PutObject @@ -207,7 +210,9 @@ impl StorageBackend for S3Backend { let chunk = if part_number == 1 { first_chunk.take().unwrap() } else { - read_chunk_async(&mut stream).await? + read_chunk_async(&mut stream, CHUNK_SIZE) + .await + .map_err(ServerError::storage_error)? }; if chunk.is_empty() { @@ -350,25 +355,3 @@ impl StorageBackend for S3Backend { })) } } - -// adapted from rust-s3 -async fn read_chunk_async(stream: &mut S) -> ServerResult> { - let mut chunk: Box<[u8]> = vec![0u8; CHUNK_SIZE].into_boxed_slice(); - let mut cursor = 0; - - while cursor < CHUNK_SIZE { - let buf = &mut chunk[cursor..]; - let read = stream.read(buf).await.map_err(ServerError::storage_error)?; - - if read == 0 { - break; - } else { - cursor += read; - } - } - - let mut vec = chunk.into_vec(); - vec.truncate(cursor); - - Ok(vec) -}