1
0
Fork 0
mirror of https://github.com/zhaofengli/attic.git synced 2024-12-15 17:50:57 +00:00

Move read_chunk_async to attic

This commit is contained in:
Zhaofeng Li 2023-01-14 23:55:10 -07:00
parent 6d3b2bd381
commit 93a38d1b1e
2 changed files with 37 additions and 26 deletions

View file

@ -6,7 +6,7 @@ use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use digest::{Digest, Output as DigestOutput}; use digest::{Digest, Output as DigestOutput};
use tokio::io::{AsyncRead, ReadBuf}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio::sync::OnceCell; use tokio::sync::OnceCell;
/// Stream filter that hashes the bytes that have been read. /// Stream filter that hashes the bytes that have been read.
@ -73,6 +73,34 @@ impl<R: AsyncRead + Unpin, D: Digest + Unpin> AsyncRead for StreamHasher<R, D> {
} }
} }
/// Greedily reads from a stream for some number of bytes.
///
/// This was originally from rust-s3 but completely rewritten to resolve
/// performance problems.
pub async fn read_chunk_async<S: AsyncRead + Unpin + Send>(
stream: &mut S,
max_size: usize,
) -> std::io::Result<Vec<u8>> {
let mut chunk: Box<[u8]> = vec![0u8; max_size].into_boxed_slice();
let mut cursor = 0;
while cursor < max_size {
let buf = &mut chunk[cursor..];
let read = stream.read(buf).await?;
if read == 0 {
break;
} else {
cursor += read;
}
}
let mut vec = chunk.into_vec();
vec.truncate(cursor);
Ok(vec)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View file

@ -9,10 +9,11 @@ use aws_sdk_s3::{
}; };
use futures::future::join_all; use futures::future::join_all;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt}; use tokio::io::AsyncRead;
use super::{Download, RemoteFile, StorageBackend}; use super::{Download, RemoteFile, StorageBackend};
use crate::error::{ErrorKind, ServerError, ServerResult}; use crate::error::{ErrorKind, ServerError, ServerResult};
use attic::stream::read_chunk_async;
use attic::util::Finally; use attic::util::Finally;
/// The chunk size for each part in a multipart upload. /// The chunk size for each part in a multipart upload.
@ -142,7 +143,9 @@ impl StorageBackend for S3Backend {
name: String, name: String,
mut stream: &mut (dyn AsyncRead + Unpin + Send), mut stream: &mut (dyn AsyncRead + Unpin + Send),
) -> ServerResult<RemoteFile> { ) -> ServerResult<RemoteFile> {
let first_chunk = read_chunk_async(&mut stream).await?; let first_chunk = read_chunk_async(&mut stream, CHUNK_SIZE)
.await
.map_err(ServerError::storage_error)?;
if first_chunk.len() < CHUNK_SIZE { if first_chunk.len() < CHUNK_SIZE {
// do a normal PutObject // do a normal PutObject
@ -207,7 +210,9 @@ impl StorageBackend for S3Backend {
let chunk = if part_number == 1 { let chunk = if part_number == 1 {
first_chunk.take().unwrap() first_chunk.take().unwrap()
} else { } else {
read_chunk_async(&mut stream).await? read_chunk_async(&mut stream, CHUNK_SIZE)
.await
.map_err(ServerError::storage_error)?
}; };
if chunk.is_empty() { if chunk.is_empty() {
@ -350,25 +355,3 @@ impl StorageBackend for S3Backend {
})) }))
} }
} }
// adapted from rust-s3
async fn read_chunk_async<S: AsyncRead + Unpin + Send>(stream: &mut S) -> ServerResult<Vec<u8>> {
let mut chunk: Box<[u8]> = vec![0u8; CHUNK_SIZE].into_boxed_slice();
let mut cursor = 0;
while cursor < CHUNK_SIZE {
let buf = &mut chunk[cursor..];
let read = stream.read(buf).await.map_err(ServerError::storage_error)?;
if read == 0 {
break;
} else {
cursor += read;
}
}
let mut vec = chunk.into_vec();
vec.truncate(cursor);
Ok(vec)
}