# agg

Collect events into buckets and emit aggregated events when conditions are met. It supports flexible grouping via key functions, filtering to select which events to aggregate, and custom view functions to transform buckets into output events. Buckets can be completed by size limits, time spans, or custom start/stop markers.

# Config

Required:

  • filtered: Function that returns true for events that should NOT be aggregated
  • key: Function that returns the grouping key for an event
  • view: Function to return an array of similarly keyed events as a single aggregated event

Optional:

  • tsField: Timestamp field name, defaults to ts
  • start: Returns true when an event should start a new bucket
  • stop: Returns true when an event should complete a bucket
  • maxSize (number | function): Maximum events per bucket (default: 1000)
  • maxEventSeconds (number | function): Maximum time span between first and last event in seconds (default: 300)
  • maxRealSeconds (number | function): Interval for flushing all buckets in seconds (default: 300)

# Example

This example assumes that an upstream stage has marked events that should be aggregated with an _agg object.

pipeline:
  count-aggregated:
    module: agg
    config:
      maxSize: 1000
      filtered: !!js/function >-
        function(event) {
          return !event._agg?.type?.startsWith('count')
        }
      start: !!js/function >-
        function(event) {
          return true // let size or time trigger aggregation
        }
      stop: !!js/function >-
        function(event) {
          return false // let size or time trigger aggregation
        }
      key: !!js/function >-
        function(event) {
          return event._agg.key
        }
      view: !!js/function >-
        function(events) {
          let event = events[0] // assuming grouped events are similar enough
          event.end = events[events.length-1].ts
          event.aggregation = 'count'
          event.duration = event.end - event.ts // milliseconds
          event.count = events.length
          if (event.error) {
            event.message = `${event.count}x: ${event.message}`
          } else if (event._agg.type === 'count-process') {
            event.message = events.map(i => i.message).join('\n')
          } else {
            // assume original message in aggregation key.
            event.message = `${event.count}x: ${event._agg.key}`
          }
          delete event._agg
          return event
        }