Skip to content

Commit b3ea7a9

Browse files
committed
batch_serialize
1 parent 36fcb59 commit b3ea7a9

File tree

9 files changed

+135
-43
lines changed

9 files changed

+135
-43
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/expression/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ educe = { workspace = true }
3333
either = { workspace = true }
3434
enum-as-inner = { workspace = true }
3535
ethnum = { workspace = true, features = ["serde", "macros"] }
36-
fastrace = { workspace = true }
3736
futures = { workspace = true }
3837
geo = { workspace = true }
3938
geozero = { workspace = true }

src/query/expression/src/aggregate/aggregate_function.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,18 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
8484

8585
fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()>;
8686

87+
fn batch_serialize(
88+
&self,
89+
places: &[StateAddr],
90+
loc: &[AggrStateLoc],
91+
builders: &mut [ColumnBuilder],
92+
) -> Result<()> {
93+
for place in places {
94+
self.serialize(AggrState::new(*place, loc), builders)?;
95+
}
96+
Ok(())
97+
}
98+
8799
fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()>;
88100

89101
/// Batch deserialize the state and merge

src/query/expression/src/aggregate/aggregate_hashtable.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ impl AggregateHashTable {
167167
}
168168
}
169169

170-
#[fastrace::trace(name = "AggregateHashTable::add_groups_inner")]
171170
// Add new groups and combine the states
172171
fn add_groups_inner(
173172
&mut self,
@@ -382,7 +381,6 @@ impl AggregateHashTable {
382381
Ok(())
383382
}
384383

385-
#[fastrace::trace(name = "AggregateHashTable::combine_payload")]
386384
pub fn combine_payload(
387385
&mut self,
388386
payload: &Payload,

src/query/expression/src/aggregate/payload_flush.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use databend_common_io::prelude::bincode_deserialize_from_slice;
1818
use super::partitioned_payload::PartitionedPayload;
1919
use super::payload::Payload;
2020
use super::probe_state::ProbeState;
21-
use super::AggrState;
2221
use crate::read;
2322
use crate::types::binary::BinaryColumn;
2423
use crate::types::binary::BinaryColumnBuilder;
@@ -141,18 +140,14 @@ impl Payload {
141140
if let Some(state_layout) = self.states_layout.as_ref() {
142141
let mut builders = state_layout.serialize_builders(row_count);
143142

144-
for place in state.state_places.as_slice()[0..row_count].iter() {
145-
for (idx, (loc, func)) in state_layout
146-
.states_loc
147-
.iter()
148-
.zip(self.aggrs.iter())
149-
.enumerate()
150-
{
151-
{
152-
let builders = builders[idx].as_tuple_mut().unwrap().as_mut_slice();
153-
func.serialize(AggrState::new(*place, loc), builders)?;
154-
}
155-
}
143+
for ((loc, func), builder) in state_layout
144+
.states_loc
145+
.iter()
146+
.zip(self.aggrs.iter())
147+
.zip(builders.iter_mut())
148+
{
149+
let builders = builder.as_tuple_mut().unwrap().as_mut_slice();
150+
func.batch_serialize(&state.state_places.as_slice()[0..row_count], loc, builders)?;
156151
}
157152

158153
entries.extend(builders.into_iter().map(|builder| builder.build().into()));

src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,15 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction for AggregateNullUnaryAdapto
191191
self.0.serialize(place, builders)
192192
}
193193

194+
fn batch_serialize(
195+
&self,
196+
places: &[StateAddr],
197+
loc: &[AggrStateLoc],
198+
builders: &mut [ColumnBuilder],
199+
) -> Result<()> {
200+
self.0.batch_serialize(places, loc, builders)
201+
}
202+
194203
fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> {
195204
self.0.merge(place, data)
196205
}
@@ -318,6 +327,15 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction
318327
self.0.serialize(place, builders)
319328
}
320329

330+
fn batch_serialize(
331+
&self,
332+
places: &[StateAddr],
333+
loc: &[AggrStateLoc],
334+
builders: &mut [ColumnBuilder],
335+
) -> Result<()> {
336+
self.0.batch_serialize(places, loc, builders)
337+
}
338+
321339
fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> {
322340
self.0.merge(place, data)
323341
}
@@ -530,6 +548,29 @@ impl<const NULLABLE_RESULT: bool> CommonNullAdaptor<NULLABLE_RESULT> {
530548
.serialize(place.remove_last_loc(), &mut builders[..(n - 1)])
531549
}
532550

551+
fn batch_serialize(
552+
&self,
553+
places: &[StateAddr],
554+
loc: &[AggrStateLoc],
555+
builders: &mut [ColumnBuilder],
556+
) -> Result<()> {
557+
if !NULLABLE_RESULT {
558+
return self.nested.batch_serialize(places, loc, builders);
559+
}
560+
let n = builders.len();
561+
debug_assert_eq!(self.nested.serialize_type().len() + 1, n);
562+
let flag_builder = builders
563+
.last_mut()
564+
.and_then(ColumnBuilder::as_boolean_mut)
565+
.unwrap();
566+
for place in places {
567+
let place = AggrState::new(*place, loc);
568+
flag_builder.push(get_flag(place));
569+
}
570+
self.nested
571+
.batch_serialize(places, &loc[..loc.len() - 1], &mut builders[..(n - 1)])
572+
}
573+
533574
fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> {
534575
if !NULLABLE_RESULT {
535576
return self.nested.merge(place, data);

src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,23 @@ use super::StateAddr;
4040
/// Use a single additional byte of data after the nested function data:
4141
/// 0 means there was no input, 1 means there was some.
4242
pub struct AggregateFunctionOrNullAdaptor {
43-
inner: AggregateFunctionRef,
43+
nested: AggregateFunctionRef,
4444
inner_nullable: bool,
4545
}
4646

4747
impl AggregateFunctionOrNullAdaptor {
4848
pub fn create(
49-
inner: AggregateFunctionRef,
49+
nested: AggregateFunctionRef,
5050
features: AggregateFunctionFeatures,
5151
) -> Result<AggregateFunctionRef> {
5252
// count/count distinct should not be nullable for empty set, just return zero
53-
let inner_return_type = inner.return_type()?;
53+
let inner_return_type = nested.return_type()?;
5454
if features.returns_default_when_only_null || inner_return_type == DataType::Null {
55-
return Ok(inner);
55+
return Ok(nested);
5656
}
5757

5858
Ok(Arc::new(AggregateFunctionOrNullAdaptor {
59-
inner,
59+
nested,
6060
inner_nullable: inner_return_type.is_nullable(),
6161
}))
6262
}
@@ -83,22 +83,22 @@ fn flag_offset(place: AggrState) -> usize {
8383

8484
impl AggregateFunction for AggregateFunctionOrNullAdaptor {
8585
fn name(&self) -> &str {
86-
self.inner.name()
86+
self.nested.name()
8787
}
8888

8989
fn return_type(&self) -> Result<DataType> {
90-
Ok(self.inner.return_type()?.wrap_nullable())
90+
Ok(self.nested.return_type()?.wrap_nullable())
9191
}
9292

9393
#[inline]
9494
fn init_state(&self, place: AggrState) {
9595
let c = place.addr.next(flag_offset(place)).get::<u8>();
9696
*c = 0;
97-
self.inner.init_state(place.remove_last_loc())
97+
self.nested.init_state(place.remove_last_loc())
9898
}
9999

100100
fn register_state(&self, registry: &mut AggrStateRegistry) {
101-
self.inner.register_state(registry);
101+
self.nested.register_state(registry);
102102
registry.register(AggrStateType::Bool);
103103
}
104104

@@ -114,7 +114,7 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
114114
return Ok(());
115115
}
116116

117-
let if_cond = self.inner.get_if_condition(columns);
117+
let if_cond = self.nested.get_if_condition(columns);
118118

119119
let validity = match (if_cond, validity) {
120120
(None, None) => None,
@@ -129,7 +129,7 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
129129
.unwrap_or(true)
130130
{
131131
set_flag(place, true);
132-
self.inner.accumulate(
132+
self.nested.accumulate(
133133
place.remove_last_loc(),
134134
columns,
135135
validity.as_ref(),
@@ -146,9 +146,9 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
146146
columns: ProjectedBlock,
147147
input_rows: usize,
148148
) -> Result<()> {
149-
self.inner
149+
self.nested
150150
.accumulate_keys(places, &loc[..loc.len() - 1], columns, input_rows)?;
151-
let if_cond = self.inner.get_if_condition(columns);
151+
let if_cond = self.nested.get_if_condition(columns);
152152

153153
match if_cond {
154154
Some(v) if v.null_count() > 0 => {
@@ -175,14 +175,14 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
175175

176176
#[inline]
177177
fn accumulate_row(&self, place: AggrState, columns: ProjectedBlock, row: usize) -> Result<()> {
178-
self.inner
178+
self.nested
179179
.accumulate_row(place.remove_last_loc(), columns, row)?;
180180
set_flag(place, true);
181181
Ok(())
182182
}
183183

184184
fn serialize_type(&self) -> Vec<StateSerdeItem> {
185-
self.inner
185+
self.nested
186186
.serialize_type()
187187
.into_iter()
188188
.chain(Some(StateSerdeItem::DataType(DataType::Boolean)))
@@ -191,7 +191,7 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
191191

192192
fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> {
193193
let n = builders.len();
194-
debug_assert_eq!(self.inner.serialize_type().len() + 1, n);
194+
debug_assert_eq!(self.nested.serialize_type().len() + 1, n);
195195

196196
let flag = get_flag(place);
197197
builders
@@ -200,13 +200,33 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
200200
.unwrap()
201201
.push(flag);
202202

203-
self.inner
203+
self.nested
204204
.serialize(place.remove_last_loc(), &mut builders[..n - 1])
205205
}
206206

207+
fn batch_serialize(
208+
&self,
209+
places: &[StateAddr],
210+
loc: &[AggrStateLoc],
211+
builders: &mut [ColumnBuilder],
212+
) -> Result<()> {
213+
let n = builders.len();
214+
debug_assert_eq!(self.nested.serialize_type().len() + 1, n);
215+
let flag_builder = builders
216+
.last_mut()
217+
.and_then(ColumnBuilder::as_boolean_mut)
218+
.unwrap();
219+
for place in places {
220+
let place = AggrState::new(*place, loc);
221+
flag_builder.push(get_flag(place));
222+
}
223+
self.nested
224+
.batch_serialize(places, &loc[..loc.len() - 1], &mut builders[..(n - 1)])
225+
}
226+
207227
fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> {
208228
merge_flag(place, *data.last().and_then(ScalarRef::as_boolean).unwrap());
209-
self.inner
229+
self.nested
210230
.merge(place.remove_last_loc(), &data[..data.len() - 1])?;
211231
Ok(())
212232
}
@@ -233,7 +253,7 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
233253
}
234254
}
235255
let inner_state = Column::Tuple(tuple[0..tuple.len() - 1].to_vec()).into();
236-
self.inner
256+
self.nested
237257
.batch_merge(places, &loc[0..loc.len() - 1], &inner_state, filter)?;
238258
}
239259
_ => {
@@ -262,7 +282,7 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
262282
}
263283

264284
fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> {
265-
self.inner
285+
self.nested
266286
.merge_states(place.remove_last_loc(), rhs.remove_last_loc())?;
267287
let flag = get_flag(place) || get_flag(rhs);
268288
set_flag(place, flag);
@@ -275,9 +295,9 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
275295
if !get_flag(place) {
276296
inner_mut.push_null();
277297
} else if self.inner_nullable {
278-
self.inner.merge_result(place.remove_last_loc(), builder)?;
298+
self.nested.merge_result(place.remove_last_loc(), builder)?;
279299
} else {
280-
self.inner
300+
self.nested
281301
.merge_result(place.remove_last_loc(), &mut inner_mut.builder)?;
282302
inner_mut.validity.push(true);
283303
}
@@ -293,21 +313,21 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
293313
params: Vec<Scalar>,
294314
arguments: Vec<DataType>,
295315
) -> Result<Option<AggregateFunctionRef>> {
296-
self.inner
316+
self.nested
297317
.get_own_null_adaptor(nested_function, params, arguments)
298318
}
299319

300320
fn need_manual_drop_state(&self) -> bool {
301-
self.inner.need_manual_drop_state()
321+
self.nested.need_manual_drop_state()
302322
}
303323

304324
unsafe fn drop_state(&self, place: AggrState) {
305-
self.inner.drop_state(place.remove_last_loc())
325+
self.nested.drop_state(place.remove_last_loc())
306326
}
307327
}
308328

309329
impl fmt::Display for AggregateFunctionOrNullAdaptor {
310330
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
311-
write!(f, "{}", self.inner)
331+
write!(f, "{}", self.nested)
312332
}
313333
}

src/query/functions/src/aggregates/aggregate_combinator_if.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,29 @@ impl AggregateFunction for AggregateIfCombinator {
163163
self.nested.serialize(place, builders)
164164
}
165165

166+
fn batch_serialize(
167+
&self,
168+
places: &[StateAddr],
169+
loc: &[AggrStateLoc],
170+
builders: &mut [ColumnBuilder],
171+
) -> Result<()> {
172+
self.nested.batch_serialize(places, loc, builders)
173+
}
174+
166175
fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> {
167176
self.nested.merge(place, data)
168177
}
169178

179+
fn batch_merge(
180+
&self,
181+
places: &[StateAddr],
182+
loc: &[AggrStateLoc],
183+
state: &databend_common_expression::BlockEntry,
184+
filter: Option<&Bitmap>,
185+
) -> Result<()> {
186+
self.nested.batch_merge(places, loc, state, filter)
187+
}
188+
170189
fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> {
171190
self.nested.merge_states(place, rhs)
172191
}

src/query/functions/src/aggregates/aggregate_combinator_state.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@ impl AggregateFunction for AggregateStateCombinator {
117117
self.nested.serialize(place, builders)
118118
}
119119

120+
fn batch_serialize(
121+
&self,
122+
places: &[StateAddr],
123+
loc: &[AggrStateLoc],
124+
builders: &mut [ColumnBuilder],
125+
) -> Result<()> {
126+
self.nested.batch_serialize(places, loc, builders)
127+
}
128+
120129
fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> {
121130
self.nested.merge(place, data)
122131
}

0 commit comments

Comments
 (0)