# sql

Connect to a SQL database at connection using driver, execute query, and emit each row as an event. When not connected to any inputs, the query will execute on pipeline start and then initiate pipeline shutdown.

This plugin uses Kysely to execute queries. The database driver for the desired dialect is installed at run-time. The following dialects are currently supported: PostgreSQL, MySQL, and SQLite. Other dialects may be supported if there is enough demand.

# Config

Required:

  • driver: Module specifier (eg npm:pg@8, npm:mysql2@3, npm:better-sqlite3@8)
  • connection: Function to return connection URL
  • query: Function that returns either a CompiledQuery or SQL string to execute

# Examples

This example demonstrates using the Kysely DSL along with given SQL() function for literal expressions.

pipeline:
  scheduler:
    config:
      cron: '* */5 * * * *'
      payload: {}
  sql:
    inputs: [scheduler]
    config:
      driver: 'npm:pg@8'
      connection: !!js/function >-
        () => `postgres://${process.env.USER}:${process.env.PASS}@localhost:5432/dbname`
      query: !!js/function >-
        function(event, db, SQL) {
          return db.selectFrom('tasks').selectAll().where('start', '<=', SQL('now()')).compile()
        }

This example demonstrates using SQL() to generate the entire query as well as sub-expressions.

pipeline:
  claim-foo-task:
    module: sql
    config:
      driver: 'npm:pg@8'
      connection: !!js/function >-
        () => `postgres://localhost/bussin`
      query: !!js/function >-
        function(event, db, SQL) {
          return SQL(`
            WITH picked AS (
              SELECT id FROM tasks WHERE
                status = 'queued' AND
                queue = 'default' AND
                key LIKE 'foo-%' AND
                start <= now()
              ORDER BY priority DESC, start, id
              FOR UPDATE SKIP LOCKED LIMIT 1)
            UPDATE tasks t SET
              status = 'running',
              locked = now(),
              worker = '${this.hostname}'
            FROM picked WHERE
              t.id = picked.id
            RETURNING t.*;
          `)
        }
  foo-task:
    module: js
    inputs: [claim-foo-task]
    config:
      function: !!js/function >-
        function(task) {
          task.results.foo = 'bar'
          return task
        }
  finish-task:
    module: sql
    inputs: [foo-task]
    config:
      driver: 'npm:pg@8'
      connection: !!js/function >-
        () => `postgres://localhost/bussin`
      query: !!js/function >-
        function(task, db, SQL) {
          return SQL(`
            UPDATE tasks SET
              status = 'succeeded'
            , results = '${JSON.stringify(task.results)}'::jsonb
            , locked = ${SQL('null')}
            , worker  = NULL
            WHERE id = ${task.id}
            RETURNING *;
          `)
        }