1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
//! APIs to read from Avro format to arrow.
use std::io::Read;

use avro_schema::file::FileMetadata;
use avro_schema::read::fallible_streaming_iterator::FallibleStreamingIterator;
use avro_schema::read::{block_iterator, BlockStreamingIterator};
use avro_schema::schema::Field as AvroField;

mod deserialize;
pub use deserialize::deserialize;
mod nested;
mod schema;
mod util;

pub use schema::infer_schema;

use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Field;
use crate::error::Result;

/// Single threaded, blocking reader of Avro; [`Iterator`] of [`Chunk`].
pub struct Reader<R: Read> {
    iter: BlockStreamingIterator<R>,
    avro_fields: Vec<AvroField>,
    fields: Vec<Field>,
    projection: Vec<bool>,
}

impl<R: Read> Reader<R> {
    /// Creates a new [`Reader`].
    pub fn new(
        reader: R,
        metadata: FileMetadata,
        fields: Vec<Field>,
        projection: Option<Vec<bool>>,
    ) -> Self {
        let projection = projection.unwrap_or_else(|| fields.iter().map(|_| true).collect());

        Self {
            iter: block_iterator(reader, metadata.compression, metadata.marker),
            avro_fields: metadata.record.fields,
            fields,
            projection,
        }
    }

    /// Deconstructs itself into its internal reader
    pub fn into_inner(self) -> R {
        self.iter.into_inner()
    }
}

impl<R: Read> Iterator for Reader<R> {
    type Item = Result<Chunk<Box<dyn Array>>>;

    fn next(&mut self) -> Option<Self::Item> {
        let fields = &self.fields[..];
        let avro_fields = &self.avro_fields;
        let projection = &self.projection;

        self.iter
            .next()
            .transpose()
            .map(|maybe_block| deserialize(maybe_block?, fields, avro_fields, projection))
    }
}