pg_incremental
Overview
| Package | Version | Category | License | Language |
|---|---|---|---|---|
pg_incremental | 1.4.1 | FEAT | PostgreSQL | C |
| ID | Extension | Bin | Lib | Load | Create | Trust | Reloc | Schema |
|---|---|---|---|---|---|---|---|---|
| 2850 | pg_incremental | No | Yes | No | Yes | No | No | pg_catalog |
| Related | pg_cron age hll rum pg_graphql pg_jsonschema jsquery pg_hint_plan |
|---|
Version
| Type | Repo | Version | PG Ver | Package | Deps |
|---|---|---|---|---|---|
| EXT | PIGSTY | 1.4.1 | 1817161514 | pg_incremental | pg_cron |
| RPM | PIGSTY | 1.4.1 | 1817161514 | pg_incremental_$v | pg_cron_$v |
| DEB | PIGSTY | 1.4.1 | 1817161514 | postgresql-$v-pg-incremental | postgresql-$v-cron |
Build
You can build the RPM / DEB packages for pg_incremental using pig build:
pig build pkg pg_incremental # build RPM / DEB packages
Install
You can install pg_incremental directly. First, make sure the PGDG and PIGSTY repositories are added and enabled:
pig repo add pgsql -u # Add repo and update cache
Install the extension using pig or apt/yum/dnf:
pig install pg_incremental; # Install for current active PG version
pig ext install -y pg_incremental -v 18 # PG 18
pig ext install -y pg_incremental -v 17 # PG 17
pig ext install -y pg_incremental -v 16 # PG 16
dnf install -y pg_incremental_18 # PG 18
dnf install -y pg_incremental_17 # PG 17
dnf install -y pg_incremental_16 # PG 16
apt install -y postgresql-18-pg-incremental # PG 18
apt install -y postgresql-17-pg-incremental # PG 17
apt install -y postgresql-16-pg-incremental # PG 16
Create Extension:
CREATE EXTENSION pg_incremental CASCADE; -- requires: pg_cron
Usage
The pg_incremental extension provides fast, reliable incremental batch processing pipelines in PostgreSQL. It defines parameterized queries that execute periodically for new data, ensuring exactly-once delivery.
CREATE EXTENSION pg_incremental CASCADE; -- depends on pg_cron
Pipeline Types
There are three types of pipelines:
- Sequence pipelines – Process ranges of sequence values from a table
- Time interval pipelines – Process time ranges after intervals pass
- File list pipelines – Process new files from a file listing function
Sequence Pipeline
Create a pipeline that incrementally aggregates new rows using a sequence:
SELECT incremental.create_sequence_pipeline('event-aggregation', 'events', $$
INSERT INTO events_agg
SELECT date_trunc('day', event_time), count(*)
FROM events
WHERE event_id BETWEEN $1 AND $2
GROUP BY 1
ON CONFLICT (day) DO UPDATE SET event_count = events_agg.event_count + excluded.event_count
$$);
$1 and $2 are set to the lowest and highest sequence values that can be safely processed.
With batch size limiting:
SELECT incremental.create_sequence_pipeline(
'event-aggregation', 'events',
$$ ... $$,
schedule := '* * * * *',
max_batch_size := 10000
);
Time Interval Pipeline
Process data in fixed time intervals:
SELECT incremental.create_time_interval_pipeline('event-aggregation', '1 day', $$
INSERT INTO events_agg
SELECT event_time::date, count(distinct event_id)
FROM events
WHERE event_time >= $1 AND event_time < $2
GROUP BY 1
$$);
$1 and $2 are set to the start and end (exclusive) of the time range.
For per-interval execution (e.g., daily exports):
SELECT incremental.create_time_interval_pipeline('event-export',
time_interval := '1 day',
batched := false,
start_time := '2024-11-01',
command := $$ SELECT export_events($1, $2) $$
);
File List Pipeline
Process new files as they appear:
SELECT incremental.create_file_list_pipeline('event-import', 's3://mybucket/events/*.csv', $$
SELECT import_events($1)
$$);
Management Functions
| Function | Description |
|---|---|
incremental.execute_pipeline(name) | Manually execute a pipeline (only if new data exists) |
incremental.reset_pipeline(name) | Reset pipeline to reprocess from the beginning |
incremental.drop_pipeline(name) | Remove a pipeline |
incremental.skip_file(pipeline, path) | Skip a faulty file in a file list pipeline |
Monitoring
SELECT * FROM incremental.sequence_pipelines;
SELECT * FROM incremental.time_interval_pipelines;
SELECT * FROM incremental.processed_files;
Check job outcomes via pg_cron:
SELECT jobname, start_time, status, return_message
FROM cron.job_run_details JOIN cron.job USING (jobid)
WHERE jobname LIKE 'pipeline:%' ORDER BY 1 DESC LIMIT 5;
Feedback
Was this page helpful?
Thanks for the feedback! Please let us know how we can improve.
Sorry to hear that. Please let us know how we can improve.