Avro read
When compiled with feature io_avro_async
, you can use this crate to read Avro files
asynchronously.
use std::sync::Arc;
use futures::pin_mut;
use futures::StreamExt;
use tokio::fs::File;
use tokio_util::compat::*;
use arrow2::error::Result;
use arrow2::io::avro::avro_schema::file::Block;
use arrow2::io::avro::avro_schema::read_async::{block_stream, decompress_block, read_metadata};
use arrow2::io::avro::read::{deserialize, infer_schema};
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();
let file_path = &args[1];
let mut reader = File::open(file_path).await?.compat();
let metadata = read_metadata(&mut reader).await?;
let schema = infer_schema(&metadata.record)?;
let metadata = Arc::new(metadata);
let projection = Arc::new(schema.fields.iter().map(|_| true).collect::<Vec<_>>());
let blocks = block_stream(&mut reader, metadata.marker).await;
pin_mut!(blocks);
while let Some(mut block) = blocks.next().await.transpose()? {
let schema = schema.clone();
let metadata = metadata.clone();
let projection = projection.clone();
// the content here is CPU-bounded. It should run on a dedicated thread pool
let handle = tokio::task::spawn_blocking(move || {
let mut decompressed = Block::new(0, vec![]);
decompress_block(&mut block, &mut decompressed, metadata.compression)?;
deserialize(
&decompressed,
&schema.fields,
&metadata.record.fields,
&projection,
)
});
let chunk = handle.await.unwrap()?;
assert!(!chunk.is_empty());
}
Ok(())
}
Note how both decompression and deserialization is performed on a separate thread pool to not block (see also here).