1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use std::io::Read;
use avro_schema::file::FileMetadata;
use avro_schema::read::fallible_streaming_iterator::FallibleStreamingIterator;
use avro_schema::read::{block_iterator, BlockStreamingIterator};
use avro_schema::schema::Field as AvroField;
mod deserialize;
pub use deserialize::deserialize;
mod nested;
mod schema;
mod util;
pub use schema::infer_schema;
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Field;
use crate::error::Result;
pub struct Reader<R: Read> {
iter: BlockStreamingIterator<R>,
avro_fields: Vec<AvroField>,
fields: Vec<Field>,
projection: Vec<bool>,
}
impl<R: Read> Reader<R> {
pub fn new(
reader: R,
metadata: FileMetadata,
fields: Vec<Field>,
projection: Option<Vec<bool>>,
) -> Self {
let projection = projection.unwrap_or_else(|| fields.iter().map(|_| true).collect());
Self {
iter: block_iterator(reader, metadata.compression, metadata.marker),
avro_fields: metadata.record.fields,
fields,
projection,
}
}
pub fn into_inner(self) -> R {
self.iter.into_inner()
}
}
impl<R: Read> Iterator for Reader<R> {
type Item = Result<Chunk<Box<dyn Array>>>;
fn next(&mut self) -> Option<Self::Item> {
let fields = &self.fields[..];
let avro_fields = &self.avro_fields;
let projection = &self.projection;
self.iter
.next()
.transpose()
.map(|maybe_block| deserialize(maybe_block?, fields, avro_fields, projection))
}
}