#
Task Runner
Using some of the built-in plugins and a relational database, LogBus can act as a simple distributed task runner. It is a bit of a toy example, but might be useful in a production setting where a full-blown task runner system is overkill. At the very least, it demonstrates how LogBus can be used to quickly protoype solutions.
A postgresql table for managing tasks and saving their output.
CREATE TYPE public.task_status AS ENUM ('queued', 'running', 'succeeded', 'failed', 'dead');
CREATE TABLE public.tasks (
id bigserial NOT NULL,
key text UNIQUE NOT NULL,
queue text DEFAULT 'default'::text NOT NULL,
status task_status DEFAULT 'queued' NOT NULL,
priority integer DEFAULT 0 NOT NULL,
payload jsonb DEFAULT '{}'::jsonb NOT NULL,
results jsonb DEFAULT '{}'::jsonb NOT NULL,
error text DEFAULT '' NOT NULL,
timeout text DEFAULT '15 minutes' NOT NULL,
retries integer DEFAULT 3 NOT NULL,
start timestamptz DEFAULT now() NOT NULL,
worker text,
locked timestamptz,
created timestamptz DEFAULT now() NOT NULL,
updated timestamptz DEFAULT now() NOT NULL,
CONSTRAINT nonrunning_requires_no_lock CHECK (((status = 'running') OR ((locked IS NULL) AND (worker IS NULL)))),
CONSTRAINT running_requires_lock CHECK (((status <> 'running') OR ((locked IS NOT NULL) AND (worker IS NOT NULL))))
);
A simple pipeline demonstrating how to compose a task runner. Every 33 seconds, unhandled "foo" tasks will be claimed by this LogBus instance and worked on. Another flow of scheduler ️️→️ sql could be used to retry stalled or failed tasks. A flow of scheduler ️️→️ sql️ → ️ js ️→ send-email could be used for reporting.
pipeline:
check-for-tasks:
config:
cron: "*/33 * * * * *"
payload: {}
claim-foo-task:
module: sql
inputs: [check-for-tasks]
config:
driver: 'npm:pg@8'
connection: !!js/function >-
() => `postgres://localhost/bussin`
query: !!js/function >-
function(event, db, SQL) {
return SQL(`
WITH picked AS (
SELECT id FROM tasks WHERE
status = 'queued' AND
queue = 'default' AND
key LIKE 'foo-%' AND
start <= now()
ORDER BY priority DESC, start, id
FOR UPDATE SKIP LOCKED LIMIT 1)
UPDATE tasks t SET
status = 'running',
locked = now(),
worker = '${this.hostname}'
FROM picked WHERE
t.id = picked.id
RETURNING t.*;
`)
}
foo-task:
module: js
inputs: [claim-foo-task]
config:
function: !!js/function >-
function(task) {
task.results.foo = 'bar'
return task
}
finish-task:
module: sql
inputs: [foo-task]
config:
driver: 'npm:pg@8'
connection: !!js/function >-
() => `postgres://localhost/bussin`
query: !!js/function >-
function(task, db, SQL) {
return SQL(`
UPDATE tasks SET
status = 'succeeded'
, results = '${JSON.stringify(task.results)}'::jsonb
, locked = NULL
, worker = NULL
WHERE id = ${task.id}
RETURNING *;
`)
}
log:
inputs: [finish-task]