1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines partition kernel for [`crate::array::Array`]

use crate::array::ord::DynComparator;
use crate::compute::sort::{build_compare, SortColumn};
use crate::error::{Error, Result};
use std::cmp::Ordering;
use std::iter::Iterator;
use std::ops::Range;

/// Given a list of already sorted columns, find partition ranges that would partition
/// lexicographically equal values across columns.
///
/// Here LexicographicalComparator is used in conjunction with binary
/// search so the columns *MUST* be pre-sorted already.
///
/// The returned vec would be of size k where k is cardinality of the sorted values; Consecutive
/// values will be connected: (a, b) and (b, c), where start = 0 and end = n for the first and last
/// range.
pub fn lexicographical_partition_ranges(
    columns: &[SortColumn],
) -> Result<impl Iterator<Item = Range<usize>>> {
    LexicographicalPartitionIterator::try_new(columns)
}

struct LexicographicalPartitionIterator {
    comparator: DynComparator,
    num_rows: usize,
    previous_partition_point: usize,
    partition_point: usize,
    value_indices: Vec<usize>,
}

impl LexicographicalPartitionIterator {
    fn try_new(columns: &[SortColumn]) -> Result<Self> {
        if columns.is_empty() {
            return Err(Error::InvalidArgumentError(
                "Sort requires at least one column".to_string(),
            ));
        }
        let num_rows = columns[0].values.len();
        if columns.iter().any(|item| item.values.len() != num_rows) {
            return Err(Error::InvalidArgumentError(
                "Lexical sort columns have different row counts".to_string(),
            ));
        };

        let comparators = columns
            .iter()
            .map(|x| build_compare(x.values, x.options.unwrap_or_default()))
            .collect::<Result<Vec<_>>>()?;

        let comparator = Box::new(move |a_idx: usize, b_idx: usize| -> Ordering {
            for comparator in comparators.iter() {
                match comparator(a_idx, b_idx) {
                    Ordering::Equal => continue,
                    other => return other,
                }
            }

            Ordering::Equal
        });

        let value_indices = (0..num_rows).collect::<Vec<usize>>();
        Ok(Self {
            comparator,
            num_rows,
            previous_partition_point: 0,
            partition_point: 0,
            value_indices,
        })
    }
}

impl Iterator for LexicographicalPartitionIterator {
    type Item = Range<usize>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.partition_point < self.num_rows {
            // invariant:
            // value_indices[0..previous_partition_point] all are values <= value_indices[previous_partition_point]
            // so in order to save time we can do binary search on the value_indices[previous_partition_point..]
            // and find when any value is greater than value_indices[previous_partition_point]; because we are using
            // new indices, the new offset is _added_ to the previous_partition_point.
            //
            // be careful that idx is of type &usize which points to the actual value within value_indices, which itself
            // contains usize (0..row_count), providing access to lexicographical_comparator as pointers into the
            // original columnar data.
            self.partition_point +=
                self.value_indices[self.partition_point..].partition_point(|idx| {
                    (self.comparator)(*idx, self.partition_point) != Ordering::Greater
                });
            let start = self.previous_partition_point;
            let end = self.partition_point;
            self.previous_partition_point = self.partition_point;
            Some(Range { start, end })
        } else {
            None
        }
    }
}