#
sql
Connect to a SQL database at connection using driver, execute query, and emit each row as an event. When not connected to any inputs, the query will execute on pipeline start and then initiate pipeline shutdown.
This plugin uses Kysely to execute queries. The database driver for the desired dialect is installed at run-time. The following dialects are currently supported: PostgreSQL, MySQL, and SQLite. Other dialects may be supported if there is enough demand.
Kysely's DSL can be used for safe variable interpolation and better composeability of complex queries. The query may also be returned as a string. Use SQL(string) for literal expressions. Use the Kysely playground to experiment.
#
Config
Required:
driver: Module specifier (egnpm:pg@8,npm:mysql2@3,npm:better-sqlite3@8)connection: Function to return connection URLquery: Function that returns either a CompiledQuery or SQL string to execute
#
Examples
This example demonstrates using the Kysely DSL along with given SQL() function for literal expressions.
pipeline:
scheduler:
config:
cron: '* */5 * * * *'
payload: {}
sql:
inputs: [scheduler]
config:
driver: 'npm:pg@8'
connection: !!js/function >-
() => `postgres://${process.env.USER}:${process.env.PASS}@localhost:5432/dbname`
query: !!js/function >-
function(event, db, SQL) {
return db.selectFrom('tasks').selectAll().where('start', '<=', SQL('now()')).compile()
}
This example demonstrates using SQL() to generate the entire query as well as sub-expressions.
pipeline:
claim-foo-task:
module: sql
config:
driver: 'npm:pg@8'
connection: !!js/function >-
() => `postgres://localhost/bussin`
query: !!js/function >-
function(event, db, SQL) {
return SQL(`
WITH picked AS (
SELECT id FROM tasks WHERE
status = 'queued' AND
queue = 'default' AND
key LIKE 'foo-%' AND
start <= now()
ORDER BY priority DESC, start, id
FOR UPDATE SKIP LOCKED LIMIT 1)
UPDATE tasks t SET
status = 'running',
locked = now(),
worker = '${this.hostname}'
FROM picked WHERE
t.id = picked.id
RETURNING t.*;
`)
}
foo-task:
module: js
inputs: [claim-foo-task]
config:
function: !!js/function >-
function(task) {
task.results.foo = 'bar'
return task
}
finish-task:
module: sql
inputs: [foo-task]
config:
driver: 'npm:pg@8'
connection: !!js/function >-
() => `postgres://localhost/bussin`
query: !!js/function >-
function(task, db, SQL) {
return SQL(`
UPDATE tasks SET
status = 'succeeded'
, results = '${JSON.stringify(task.results)}'::jsonb
, locked = ${SQL('null')}
, worker = NULL
WHERE id = ${task.id}
RETURNING *;
`)
}