# Task Runner

Using some of the built-in plugins and a relational database, LogBus can act as a simple distributed task runner. It is a bit of a toy example, but might be useful in a production setting where a full-blown task runner system is overkill. At the very least, it demonstrates how LogBus can be used to quickly protoype solutions.

A postgresql table for managing tasks and saving their output.

CREATE TYPE public.task_status AS ENUM ('queued', 'running', 'succeeded', 'failed', 'dead');
CREATE TABLE public.tasks (
    id bigserial NOT NULL,
    key text UNIQUE NOT NULL,
    queue text DEFAULT 'default'::text NOT NULL,
    status task_status DEFAULT 'queued' NOT NULL,
    priority integer DEFAULT 0 NOT NULL,
    payload jsonb DEFAULT '{}'::jsonb NOT NULL,
    results jsonb DEFAULT '{}'::jsonb NOT NULL,
    error text DEFAULT '' NOT NULL,
    timeout text DEFAULT '15 minutes' NOT NULL,
    retries integer DEFAULT 3 NOT NULL,
    start timestamptz DEFAULT now() NOT NULL,
    worker text,
    locked timestamptz,
    created timestamptz DEFAULT now() NOT NULL,
    updated timestamptz DEFAULT now() NOT NULL,
    CONSTRAINT nonrunning_requires_no_lock CHECK (((status = 'running') OR ((locked IS NULL) AND (worker IS NULL)))),
    CONSTRAINT running_requires_lock CHECK (((status <> 'running') OR ((locked IS NOT NULL) AND (worker IS NOT NULL))))
);

A simple pipeline demonstrating how to compose a task runner. Every 33 seconds, unhandled "foo" tasks will be claimed by this LogBus instance and worked on. Another flow of scheduler ️️→️ sql could be used to retry stalled or failed tasks. A flow of scheduler ️️→️ sql️ → ️ js ️→ send-email could be used for reporting.

pipeline:
  check-for-tasks:
    config:
      cron: "*/33 * * * * *"
      payload: {}
  claim-foo-task:
    module: sql
    inputs: [check-for-tasks]
    config:
      driver: 'npm:pg@8'
      connection: !!js/function >-
        () => `postgres://localhost/bussin`
      query: !!js/function >-
        function(event, db, SQL) {
          return SQL(`
            WITH picked AS (
              SELECT id FROM tasks WHERE
                status = 'queued' AND
                queue = 'default' AND
                key LIKE 'foo-%' AND
                start <= now()
              ORDER BY priority DESC, start, id
              FOR UPDATE SKIP LOCKED LIMIT 1)
            UPDATE tasks t SET
              status = 'running',
              locked = now(),
              worker = '${this.hostname}'
            FROM picked WHERE
              t.id = picked.id
            RETURNING t.*;
          `)
        }
  foo-task:
    module: js
    inputs: [claim-foo-task]
    config:
      function: !!js/function >-
        function(task) {
          task.results.foo = 'bar'
          return task
        }
  finish-task:
    module: sql
    inputs: [foo-task]
    config:
      driver: 'npm:pg@8'
      connection: !!js/function >-
        () => `postgres://localhost/bussin`
      query: !!js/function >-
        function(task, db, SQL) {
          return SQL(`
            UPDATE tasks SET
              status = 'succeeded'
            , results = '${JSON.stringify(task.results)}'::jsonb
            , locked = NULL
            , worker  = NULL
            WHERE id = ${task.id}
            RETURNING *;
          `)
        }
  log:
    inputs: [finish-task]