Datafusion
Join的实现
type system
use apache arrow as the basis of our type system
logical Plan
serialization
-
json 等
-
与语言无关的序列化格式 Avro Thrift Protocol Buffers
-
substrait 关系代数的统一标准
// 初始化每个分区的 BooleanBufferBuilder
let num_rows = batch.num_rows();
let mut builders: Vec
// 单次循环设置所有分区的掩码
for (idx, &hash) in hash_buffer.iter().enumerate() {
let partition = (hash % *partitions as u64) as usize;
builders[partition].set_bit(idx, true); // 直接设置位值
}
// 转换为 BooleanArray
let masks: Vec<BooleanArray> = builders
.into_iter()
.map(|mut builder| {
BooleanArray::from(builder.finish())
})
.collect();
timer.done();
let partitioner_timer = &self.timer;
// 使用 filter 生成分区数据
let it = masks
.into_iter()
.enumerate()
.filter(|(_, mask)| mask.true_count() > 0)
.map(move |(partition, mask)| {
let _timer = partitioner_timer.timer();
let columns = batch.columns()
.iter()
.map(|col| filter(col, &mask).map_err(|e| DataFusionError::ArrowError(e, None)))
.collect::<Result<Vec<ArrayRef>>>()?;
RecordBatch::try_new(batch.schema(), columns)
.map(|batch| (partition, batch)).map_err(|e| DataFusionError::ArrowError(e, None))
});
Box::new(it)
// 2. 预分配列式缓冲区 (替代位掩码)
let mut partition_buffers: Vec<Vec<Box
// 3. 单次遍历填充数据
for (row_idx, &hash) in hash_buffer.iter().enumerate() {
let partition = (hash % *partitions as u64) as usize;
for (col_idx, array) in batch.columns().iter().enumerate() {
let builder = &mut partition_buffers[partition][col_idx];
append_value(array, row_idx, builder)?;
}
}
// Finished building index-arrays for output partitions
timer.done();
// Borrowing partitioner timer to prevent moving `self` to closure
let partitioner_timer = &self.timer;
// let it = indices
// .into_iter()
// .enumerate()
// .filter_map(|(partition, indices)| {
// let indices: PrimitiveArray<UInt32Type> = indices.into();
// (!indices.is_empty()).then_some((partition, indices))
// })
// .map(move |(partition, indices)| {
// // Tracking time required for repartitioned batches construction
// let _timer = partitioner_timer.timer();
//
// // Produce batches based on indices
// let columns = take_arrays(batch.columns(), &indices, None)?;
//
// let mut options = RecordBatchOptions::new();
// options = options.with_row_count(Some(indices.len()));
// let batch = RecordBatch::try_new_with_options(
// batch.schema(),
// columns,
// &options,
// )
// .unwrap();
//
// Ok((partition, batch))
// });
let schema = batch.schema();
let batches = partition_buffers
.into_iter()
.enumerate()
.filter(|(_, cols)| cols.iter().any(|c| c.len() > 0))
.map(move|(partition, cols)| {
let _timer = partitioner_timer.timer();
let arrays = cols.into_iter()
.map(|mut b| b.finish())
.collect::<Vec<ArrayRef>>();
RecordBatch::try_new(schema.clone(), arrays)
.map(|batch| (partition, batch))
.map_err(|e| DataFusionError::ArrowError(e, None))
});
Box::new(batches)
}
};
Ok(it)
}
// return the number of output partitions
fn num_partitions(&self) -> usize {
match self.state {
BatchPartitionerState::RoundRobin { num_partitions, .. } => num_partitions,
BatchPartitionerState::Hash { num_partitions, .. } => num_partitions,
}
}
}
// 修改 make_builder 函数以支持 Utf8View 类型
fn make_builder(data_type: &DataType, capacity: usize) -> Result<Box
// 修改 append_value 函数以支持 Utf8View 类型
fn append_value(array: &ArrayRef, index: usize, builder: &mut Box