#
Log Processing
The source plugins enable the ingestion of system & application logs in many environments, particularly Linux based systems. Many plugins are available to help with filtering, transforming, and aggregating logs before formatting and shipping to a data sink. If there is no built-in plugin that addresses your needs, let the developers know. Even better, develop a custom plugin and submit it for consideration as a built-in plugin. Some built-in plugins & how they can be used are worthy of special mention.
#
Journal
LogBus will query journald at a user-defined rate for new log messages. In the event that the LogBus process is restarted, it will continue from its last queried point. You will probably want to normalize the journald fields into a schema that makes sense for your organization. Here is an example of how TFKS does that using the keep plugin:
xform-journal:
# {
# "NET_TFKS_TAGS": "lb",
# "CONTAINER_ID" : "0465baff7180",
# "CONTAINER_ID_FULL" : "0465baff71803614d2f779efc6f06106423119e4e5cc3fc6d6bdc15dd10677d8",
# "CONTAINER_NAME" : "lb",
# "CONTAINER_TAG" : "0465baff7180",
# "IMAGE_NAME" : "nginx:1.19.6",
# "MESSAGE" : "24.196.40.91 foo.bar.com - 1999-12-31T23:59:59+00:00 \"GET / HTTP/2.0\" 200 0 0.028 \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15\"",
# "PRIORITY" : "6",
# "SYSLOG_IDENTIFIER" : "0465baff7180",
# "_BOOT_ID" : "750906d022894f61b363100559246218",
# "_CAP_EFFECTIVE" : "3fffffffff",
# "_CMDLINE" : "/usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock",
# "_COMM" : "dockerd",
# "_EXE" : "/usr/bin/dockerd",
# "_GID" : "0",
# "_HOSTNAME" : "abc123",
# "_MACHINE_ID" : "067df0c6ee48459fa3d1d7e99bb76cfa",
# "_PID" : "7561",
# "_SELINUX_CONTEXT" : "unconfined\n",
# "_SOURCE_REALTIME_TIMESTAMP" : "1624486039358678",
# "_SYSTEMD_CGROUP" : "/system.slice/docker.service",
# "_SYSTEMD_INVOCATION_ID" : "4566f04ca28d420787f4db01546b4105",
# "_SYSTEMD_SLICE" : "system.slice",
# "_SYSTEMD_UNIT" : "docker.service",
# "_TRANSPORT" : "journal",
# "_UID" : "0",
# "__CURSOR" : "s=656eb254cada41b79b12dc5665c9d250;i=191b5f;b=750906d022894f61b363100559246218;m=9415b575426;t=5c57621518d6a;x=a6e1d0a0822d171e",
# "__MONOTONIC_TIMESTAMP" : "10176309974054",
# "__REALTIME_TIMESTAMP" : "1624486039358826",
# }
module: keep
config:
fields:
message: MESSAGE
ts: ["_SOURCE_REALTIME_TIMESTAMP", "__REALTIME_TIMESTAMP"]
hostname: ["_HOSTNAME", "_MACHINE_ID"]
severity: PRIORITY
process: ["_COMM", "SYSLOG_IDENTIFIER"]
unit: _SYSTEMD_UNIT
exe: _EXE
pid: _PID
uid: _UID
gid: _GID
image_name: IMAGE_NAME
container_name: CONTAINER_NAME
container_id: CONTAINER_ID
transport: _TRANSPORT
tags: NET_TFKS_TAGS
#
Filtering
Handling noise is a common task. Here is how TFKS handles things like avoiding unprocessable logs, duplicate Docker container logs, and LogBus logging feedback loops.
noise:
module: js
config:
function: !!js/function >-
function(event) {
if (typeof event.message !== 'string') {
// journald will emit as bytes if message contains things it does not like (eg control characters).
// try to workaround with map(chr(i))? Or, use `journalctl --all`?
return
}
const msg = event.message.trim()
if (!msg) {
return
}
if (event.unit === 'tfks-logbus.service') {
// ignore our own output to avoid feedback loop
return
}
if (event.transport === 'stdout' && event.process === 'docker') {
// the stdout of containerized systemd units are duplicated by
// the docker.service: `transport:journal && process:dockerd`
return
}
// any other noise filters
return event
}
#
Aggregation
One way to deal with noisy processes is to combine log messages into a single multi-line log message. In this example, an upstream stage has marked processes that should have their logs aggregated using an _agg object. There is nothing special about _agg - it's just the convention used in this pipeline. This example stage will aggregate these into a single 5-second view of those logs.
count-aggregated:
module: agg
config:
maxRealSeconds: 5
filtered: !!js/function >-
function(event) {
return !event._agg || !event._agg.type.startsWith('count')
}
start: !!js/function >-
function(event) {
return true // let size or time trigger aggregation
}
stop: !!js/function >-
function(event) {
return false // let size or time trigger aggregation
}
key: !!js/function >-
function(event) {
return event._agg.key
}
view: !!js/function >-
function(events) {
let event = events[0] // assuming grouped events are similar enough
event.end = events[events.length-1].ts
event.aggregation = 'count'
event.duration = event.end - event.ts // milliseconds
event.count = events.length
if (event.error) {
event.message = `${event.count}x: ${event.message}`
} else if (event._agg.type === 'count-process') {
event.message = events.map(i => i.message).join('\n')
} else {
// assume original message in aggregation key.
event.message = `${event.count}x: ${event._agg.key}`
}
delete event._agg
return event
}