1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use parquet2::error::Error as ParquetError;
use parquet2::schema::types::ParquetType;
use parquet2::write::Compressor;
use parquet2::FallibleStreamingIterator;
use crate::{
array::Array,
chunk::Chunk,
datatypes::Schema,
error::{Error, Result},
};
use super::{
array_to_columns, to_parquet_schema, DynIter, DynStreamingIterator, Encoding, RowGroupIter,
SchemaDescriptor, WriteOptions,
};
pub fn row_group_iter<A: AsRef<dyn Array> + 'static + Send + Sync>(
chunk: Chunk<A>,
encodings: Vec<Vec<Encoding>>,
fields: Vec<ParquetType>,
options: WriteOptions,
) -> RowGroupIter<'static, Error> {
assert_eq!(encodings.len(), fields.len());
assert_eq!(encodings.len(), chunk.arrays().len());
DynIter::new(
chunk
.into_arrays()
.into_iter()
.zip(fields.into_iter())
.zip(encodings.into_iter())
.flat_map(move |((array, type_), encoding)| {
let encoded_columns = array_to_columns(array, type_, options, &encoding).unwrap();
encoded_columns
.into_iter()
.map(|encoded_pages| {
let pages = encoded_pages;
let pages = DynIter::new(
pages
.into_iter()
.map(|x| x.map_err(|e| ParquetError::OutOfSpec(e.to_string()))),
);
let compressed_pages = Compressor::new(pages, options.compression, vec![])
.map_err(Error::from);
Ok(DynStreamingIterator::new(compressed_pages))
})
.collect::<Vec<_>>()
}),
)
}
pub struct RowGroupIterator<A: AsRef<dyn Array> + 'static, I: Iterator<Item = Result<Chunk<A>>>> {
iter: I,
options: WriteOptions,
parquet_schema: SchemaDescriptor,
encodings: Vec<Vec<Encoding>>,
}
impl<A: AsRef<dyn Array> + 'static, I: Iterator<Item = Result<Chunk<A>>>> RowGroupIterator<A, I> {
pub fn try_new(
iter: I,
schema: &Schema,
options: WriteOptions,
encodings: Vec<Vec<Encoding>>,
) -> Result<Self> {
if encodings.len() != schema.fields.len() {
return Err(Error::InvalidArgumentError(
"The number of encodings must equal the number of fields".to_string(),
));
}
let parquet_schema = to_parquet_schema(schema)?;
Ok(Self {
iter,
options,
parquet_schema,
encodings,
})
}
pub fn parquet_schema(&self) -> &SchemaDescriptor {
&self.parquet_schema
}
}
impl<A: AsRef<dyn Array> + 'static + Send + Sync, I: Iterator<Item = Result<Chunk<A>>>> Iterator
for RowGroupIterator<A, I>
{
type Item = Result<RowGroupIter<'static, Error>>;
fn next(&mut self) -> Option<Self::Item> {
let options = self.options;
self.iter.next().map(|maybe_chunk| {
let chunk = maybe_chunk?;
if self.encodings.len() != chunk.arrays().len() {
return Err(Error::InvalidArgumentError(
"The number of arrays in the chunk must equal the number of fields in the schema"
.to_string(),
));
};
let encodings = self.encodings.clone();
Ok(row_group_iter(
chunk,
encodings,
self.parquet_schema.fields().to_vec(),
options,
))
})
}
}