# Log Processing

The source plugins enable the ingestion of system & application logs in many environments, particularly Linux based systems. Many plugins are available to help with filtering, transforming, and aggregating logs before formatting and shipping to a data sink. If there is no built-in plugin that addresses your needs, let the developers know. Even better, develop a custom plugin and submit it for consideration as a built-in plugin. Some built-in plugins & how they can be used are worthy of special mention.

# Journal

LogBus will query journald at a user-defined rate for new log messages. In the event that the LogBus process is restarted, it will continue from its last queried point. You will probably want to normalize the journald fields into a schema that makes sense for your organization. Here is an example of how TFKS does that using the keep plugin:

xform-journal:
  # {
  #   "NET_TFKS_TAGS": "lb",
  #   "CONTAINER_ID" : "0465baff7180",
  #   "CONTAINER_ID_FULL" : "0465baff71803614d2f779efc6f06106423119e4e5cc3fc6d6bdc15dd10677d8",
  #   "CONTAINER_NAME" : "lb",
  #   "CONTAINER_TAG" : "0465baff7180",
  #   "IMAGE_NAME" : "nginx:1.19.6",
  #   "MESSAGE" : "24.196.40.91 foo.bar.com - 1999-12-31T23:59:59+00:00 \"GET / HTTP/2.0\" 200 0 0.028 \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15\"",
  #   "PRIORITY" : "6",
  #   "SYSLOG_IDENTIFIER" : "0465baff7180",
  #   "_BOOT_ID" : "750906d022894f61b363100559246218",
  #   "_CAP_EFFECTIVE" : "3fffffffff",
  #   "_CMDLINE" : "/usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock",
  #   "_COMM" : "dockerd",
  #   "_EXE" : "/usr/bin/dockerd",
  #   "_GID" : "0",
  #   "_HOSTNAME" : "abc123",
  #   "_MACHINE_ID" : "067df0c6ee48459fa3d1d7e99bb76cfa",
  #   "_PID" : "7561",
  #   "_SELINUX_CONTEXT" : "unconfined\n",
  #   "_SOURCE_REALTIME_TIMESTAMP" : "1624486039358678",
  #   "_SYSTEMD_CGROUP" : "/system.slice/docker.service",
  #   "_SYSTEMD_INVOCATION_ID" : "4566f04ca28d420787f4db01546b4105",
  #   "_SYSTEMD_SLICE" : "system.slice",
  #   "_SYSTEMD_UNIT" : "docker.service",
  #   "_TRANSPORT" : "journal",
  #   "_UID" : "0",
  #   "__CURSOR" : "s=656eb254cada41b79b12dc5665c9d250;i=191b5f;b=750906d022894f61b363100559246218;m=9415b575426;t=5c57621518d6a;x=a6e1d0a0822d171e",
  #   "__MONOTONIC_TIMESTAMP" : "10176309974054",
  #   "__REALTIME_TIMESTAMP" : "1624486039358826",
  # }
  module: keep
  config:
    fields:
      message: MESSAGE
      ts: ["_SOURCE_REALTIME_TIMESTAMP", "__REALTIME_TIMESTAMP"]
      hostname: ["_HOSTNAME", "_MACHINE_ID"]
      severity: PRIORITY
      process: ["_COMM", "SYSLOG_IDENTIFIER"]
      unit: _SYSTEMD_UNIT
      exe: _EXE
      pid: _PID
      uid: _UID
      gid: _GID
      image_name: IMAGE_NAME
      container_name: CONTAINER_NAME
      container_id: CONTAINER_ID
      transport: _TRANSPORT
      tags: NET_TFKS_TAGS

# Filtering

Handling noise is a common task. Here is how TFKS handles things like avoiding unprocessable logs, duplicate Docker container logs, and LogBus logging feedback loops.

noise:
  module: js
  config:
    function: !!js/function >-
      function(event) {
        if (typeof event.message !== 'string') {
          // journald will emit as bytes if message contains things it does not like (eg control characters).
          // try to workaround with map(chr(i))? Or, use `journalctl --all`?
          return
        }
        const msg = event.message.trim()
        if (!msg) {
          return
        }
        if (event.unit === 'tfks-logbus.service') {
          // ignore our own output to avoid feedback loop
          return
        }
        if (event.transport === 'stdout' && event.process === 'docker') {
          // the stdout of containerized systemd units are duplicated by
          // the docker.service: `transport:journal && process:dockerd`
          return
        }
        // any other noise filters
        return event
      }

# Aggregation

One way to deal with noisy processes is to combine log messages into a single multi-line log message. In this example, an upstream stage has marked processes that should have their logs aggregated using an _agg object. There is nothing special about _agg - it's just the convention used in this pipeline. This example stage will aggregate these into a single 5-second view of those logs.

count-aggregated:
  module: agg
  config:
    maxRealSeconds: 5
    filtered: !!js/function >-
      function(event) {
        return !event._agg || !event._agg.type.startsWith('count')
      }
    start: !!js/function >-
      function(event) {
        return true // let size or time trigger aggregation
      }
    stop: !!js/function >-
      function(event) {
        return false // let size or time trigger aggregation
      }
    key: !!js/function >-
      function(event) {
        return event._agg.key
      }
    view: !!js/function >-
      function(events) {
        let event = events[0] // assuming grouped events are similar enough
        event.end = events[events.length-1].ts
        event.aggregation = 'count'
        event.duration = event.end - event.ts // milliseconds
        event.count = events.length
        if (event.error) {
          event.message = `${event.count}x: ${event.message}`
        } else if (event._agg.type === 'count-process') {
          event.message = events.map(i => i.message).join('\n')
        } else {
          // assume original message in aggregation key.
          event.message = `${event.count}x: ${event._agg.key}`
        }
        delete event._agg
        return event
      }