#
agg
Collect events into buckets and emit aggregated events when conditions are met. It supports flexible grouping via key functions, filtering to select which events to aggregate, and custom view functions to transform buckets into output events. Buckets can be completed by size limits, time spans, or custom start/stop markers.
#
Config
Required:
filtered: Function that returns true for events that should NOT be aggregatedkey: Function that returns the grouping key for an eventview: Function to return an array of similarly keyed events as a single aggregated event
Optional:
tsField: Timestamp field name, defaults totsstart: Returns true when an event should start a new bucketstop: Returns true when an event should complete a bucketmaxSize(number | function): Maximum events per bucket (default: 1000)maxEventSeconds(number | function): Maximum time span between first and last event in seconds (default: 300)maxRealSeconds(number | function): Interval for flushing all buckets in seconds (default: 300)
#
Example
This example assumes that an upstream stage has marked events that should be aggregated with an _agg object.
pipeline:
count-aggregated:
module: agg
config:
maxSize: 1000
filtered: !!js/function >-
function(event) {
return !event._agg?.type?.startsWith('count')
}
start: !!js/function >-
function(event) {
return true // let size or time trigger aggregation
}
stop: !!js/function >-
function(event) {
return false // let size or time trigger aggregation
}
key: !!js/function >-
function(event) {
return event._agg.key
}
view: !!js/function >-
function(events) {
let event = events[0] // assuming grouped events are similar enough
event.end = events[events.length-1].ts
event.aggregation = 'count'
event.duration = event.end - event.ts // milliseconds
event.count = events.length
if (event.error) {
event.message = `${event.count}x: ${event.message}`
} else if (event._agg.type === 'count-process') {
event.message = events.map(i => i.message).join('\n')
} else {
// assume original message in aggregation key.
event.message = `${event.count}x: ${event._agg.key}`
}
delete event._agg
return event
}