Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/base/src/runtime/profile/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::runtime::metrics::ScopedRegistry;
use crate::runtime::profile::ProfileStatisticsName;
use crate::runtime::ThreadTracker;

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ProfileLabel {
pub name: String,
pub value: Vec<String>,
Expand Down
8 changes: 4 additions & 4 deletions src/common/storage/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use serde::Deserialize;
use serde::Serialize;
use thiserror::Error;

#[derive(Default, Clone, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct CopyStatus {
/// Key is file path.
pub files: DashMap<String, FileStatus>,
Expand All @@ -45,7 +45,7 @@ impl CopyStatus {
}
}

#[derive(Default, Clone, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct FileStatus {
pub num_rows_loaded: usize,
pub error: Option<FileErrorsInfo>,
Expand Down Expand Up @@ -79,7 +79,7 @@ impl FileStatus {
}
}

#[derive(Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileErrorsInfo {
pub num_errors: usize,
pub first_error: FileParseErrorAtLine,
Expand Down Expand Up @@ -156,7 +156,7 @@ impl FileParseError {
}
}

#[derive(Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileParseErrorAtLine {
pub error: FileParseError,
pub line: usize,
Expand Down
2 changes: 1 addition & 1 deletion src/common/storage/src/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use serde::Deserialize;
use serde::Serialize;

#[derive(Default, Clone, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct MutationStatus {
pub insert_rows: u64,
pub deleted_rows: u64,
Expand Down
2 changes: 1 addition & 1 deletion src/query/catalog/src/statistics/data_cache_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct DataCacheMetrics {
bytes_from_memory: AtomicUsize,
}

#[derive(Default, Clone, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct DataCacheMetricValues {
pub bytes_from_remote_disk: usize,
pub bytes_from_local_disk: usize,
Expand Down
49 changes: 32 additions & 17 deletions src/query/expression/src/aggregate/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use super::AggrState;
use super::AggrStateLoc;
use super::AggrStateRegistry;
use super::StateAddr;
use crate::types::BinaryType;
use crate::types::DataType;
use crate::BlockEntry;
use crate::ColumnBuilder;
use crate::ColumnView;
use crate::ProjectedBlock;
use crate::Scalar;
use crate::ScalarRef;
use crate::StateSerdeItem;
use crate::StateSerdeType;

pub type AggregateFunctionRef = Arc<dyn AggregateFunction>;

Expand Down Expand Up @@ -69,32 +70,51 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
// Used in aggregate_null_adaptor
fn accumulate_row(&self, place: AggrState, columns: ProjectedBlock, row: usize) -> Result<()>;

fn serialize(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()>;
fn serialize_type(&self) -> Vec<StateSerdeItem>;

fn serialize_size_per_row(&self) -> Option<usize> {
None
fn serialize_data_type(&self) -> DataType {
let serde_type = StateSerdeType::new(self.serialize_type());
serde_type.data_type()
}

fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()>;
fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> {
let binary_builder = builders[0].as_binary_mut().unwrap();
self.serialize_binary(place, &mut binary_builder.data)?;
binary_builder.commit_row();
Ok(())
}

fn serialize_binary(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()>;

fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> {
let mut binary = *data[0].as_binary().unwrap();
self.merge_binary(place, &mut binary)
}

fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()>;

/// Batch merge and deserialize the state from binary array
fn batch_merge(
&self,
places: &[StateAddr],
loc: &[AggrStateLoc],
state: &ColumnView<BinaryType>,
state: &BlockEntry,
) -> Result<()> {
for (place, mut data) in places.iter().zip(state.iter()) {
self.merge(AggrState::new(*place, loc), &mut data)?;
let column = state.to_column();
for (place, data) in places.iter().zip(column.iter()) {
self.merge(
AggrState::new(*place, loc),
data.as_tuple().unwrap().as_slice(),
)?;
}

Ok(())
}

fn batch_merge_single(&self, place: AggrState, state: &BlockEntry) -> Result<()> {
let view = state.downcast::<BinaryType>().unwrap();
for mut data in view.iter() {
self.merge(place, &mut data)?;
let column = state.to_column();
for data in column.iter() {
self.merge(place, data.as_tuple().unwrap().as_slice())?;
}
Ok(())
}
Expand Down Expand Up @@ -149,9 +169,4 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
fn get_if_condition(&self, _columns: ProjectedBlock) -> Option<Bitmap> {
None
}

// some features
fn convert_const_to_full(&self) -> bool {
true
}
}
60 changes: 53 additions & 7 deletions src/query/expression/src/aggregate/aggregate_function_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use enum_as_inner::EnumAsInner;

use super::AggregateFunctionRef;
use crate::types::binary::BinaryColumnBuilder;
use crate::types::DataType;
use crate::ColumnBuilder;

#[derive(Clone, Copy, Debug)]
pub struct StateAddr {
Expand Down Expand Up @@ -113,11 +115,11 @@ impl From<StateAddr> for usize {

pub fn get_states_layout(funcs: &[AggregateFunctionRef]) -> Result<StatesLayout> {
let mut registry = AggrStateRegistry::default();
let mut serialize_size = Vec::with_capacity(funcs.len());
let mut serialize_type = Vec::with_capacity(funcs.len());
for func in funcs {
func.register_state(&mut registry);
registry.commit();
serialize_size.push(func.serialize_size_per_row());
serialize_type.push(StateSerdeType(func.serialize_type().into()));
}

let AggrStateRegistry { states, offsets } = registry;
Expand All @@ -132,7 +134,7 @@ pub fn get_states_layout(funcs: &[AggregateFunctionRef]) -> Result<StatesLayout>
Ok(StatesLayout {
layout,
states_loc,
serialize_size,
serialize_type,
})
}

Expand Down Expand Up @@ -191,18 +193,62 @@ impl AggrStateLoc {
}
}

#[derive(Debug, Clone)]
pub enum StateSerdeItem {
DataType(DataType),
Binary(Option<usize>),
}

#[derive(Debug, Clone)]
pub struct StateSerdeType(Box<[StateSerdeItem]>);

impl StateSerdeType {
pub fn new(items: impl Into<Box<[StateSerdeItem]>>) -> Self {
StateSerdeType(items.into())
}

pub fn data_type(&self) -> DataType {
DataType::Tuple(
self.0
.iter()
.map(|item| match item {
StateSerdeItem::DataType(data_type) => data_type.clone(),
StateSerdeItem::Binary(_) => DataType::Binary,
})
.collect(),
)
}
}

#[derive(Debug, Clone)]
pub struct StatesLayout {
pub layout: Layout,
pub states_loc: Vec<Box<[AggrStateLoc]>>,
serialize_size: Vec<Option<usize>>,
pub(super) serialize_type: Vec<StateSerdeType>,
}

impl StatesLayout {
pub fn serialize_builders(&self, num_rows: usize) -> Vec<BinaryColumnBuilder> {
self.serialize_size
pub fn serialize_builders(&self, num_rows: usize) -> Vec<ColumnBuilder> {
self.serialize_type
.iter()
.map(|size| BinaryColumnBuilder::with_capacity(num_rows, num_rows * size.unwrap_or(0)))
.map(|serde_type| {
let builder = serde_type
.0
.iter()
.map(|item| match item {
StateSerdeItem::DataType(data_type) => {
ColumnBuilder::with_capacity(data_type, num_rows)
}
StateSerdeItem::Binary(size) => {
ColumnBuilder::Binary(BinaryColumnBuilder::with_capacity(
num_rows,
num_rows * size.unwrap_or(0),
))
}
})
.collect();
ColumnBuilder::Tuple(builder)
})
.collect()
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/query/expression/src/aggregate/aggregate_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::aggregate::payload_row::row_match_columns;
use crate::group_hash_columns;
use crate::new_sel;
use crate::read;
use crate::types::BinaryType;
use crate::types::DataType;
use crate::AggregateFunctionRef;
use crate::BlockEntry;
Expand Down Expand Up @@ -219,7 +218,7 @@ impl AggregateHashTable {
.zip(agg_states.iter())
.zip(states_layout.states_loc.iter())
{
func.batch_merge(state_places, loc, &state.downcast::<BinaryType>().unwrap())?;
func.batch_merge(state_places, loc, state)?;
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions src/query/expression/src/aggregate/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,11 +421,15 @@ impl Payload {
true
}

pub fn empty_block(&self, fake_rows: Option<usize>) -> DataBlock {
let fake_rows = fake_rows.unwrap_or(0);
let entries = (0..self.aggrs.len())
.map(|_| {
ColumnBuilder::repeat_default(&DataType::Binary, fake_rows)
pub fn empty_block(&self, fake_rows: usize) -> DataBlock {
assert_eq!(self.aggrs.is_empty(), self.states_layout.is_none());
let entries = self
.states_layout
.as_ref()
.iter()
.flat_map(|layout| layout.serialize_type.iter())
.map(|serde_type| {
ColumnBuilder::repeat_default(&serde_type.data_type(), fake_rows)
.build()
.into()
})
Expand Down
15 changes: 5 additions & 10 deletions src/query/expression/src/aggregate/payload_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Payload {
}

if blocks.is_empty() {
return Ok(self.empty_block(None));
return Ok(self.empty_block(0));
}
DataBlock::concat(&blocks)
}
Expand All @@ -149,18 +149,13 @@ impl Payload {
.enumerate()
{
{
let builder = &mut builders[idx];
func.serialize(AggrState::new(*place, loc), &mut builder.data)?;
builder.commit_row();
let builders = builders[idx].as_tuple_mut().unwrap().as_mut_slice();
func.serialize(AggrState::new(*place, loc), builders)?;
}
}
}

entries.extend(
builders
.into_iter()
.map(|builder| Column::Binary(builder.build()).into()),
);
entries.extend(builders.into_iter().map(|builder| builder.build().into()));
}

entries.extend_from_slice(&state.take_group_columns());
Expand All @@ -177,7 +172,7 @@ impl Payload {
}

if blocks.is_empty() {
return Ok(self.empty_block(None));
return Ok(self.empty_block(0));
}

DataBlock::concat(&blocks)
Expand Down
Loading
Loading