pg_incremental

Incremental Processing by Crunchy Data

Overview

PackageVersionCategoryLicenseLanguage
pg_incremental1.4.1FEATPostgreSQLC
IDExtensionBinLibLoadCreateTrustRelocSchema
2850pg_incrementalNoYesNoYesNoNopg_catalog
Relatedpg_cron age hll rum pg_graphql pg_jsonschema jsquery pg_hint_plan

Version

TypeRepoVersionPG VerPackageDeps
EXTPIGSTY1.4.11817161514pg_incrementalpg_cron
RPMPIGSTY1.4.11817161514pg_incremental_$vpg_cron_$v
DEBPIGSTY1.4.11817161514postgresql-$v-pg-incrementalpostgresql-$v-cron
OS / PGPG18PG17PG16PG15PG14
el8.x86_64PIGSTY MISSPIGSTY MISS
el8.aarch64PIGSTY MISSPIGSTY MISS
el9.x86_64PIGSTY MISSPIGSTY MISS
el9.aarch64PIGSTY MISSPIGSTY MISS
el10.x86_64PIGSTY MISSPIGSTY MISS
el10.aarch64PIGSTY MISSPIGSTY MISS
d12.x86_64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
d12.aarch64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
d13.x86_64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
d13.aarch64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
u22.x86_64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
u22.aarch64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
u24.x86_64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS
u24.aarch64
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY 1.4.1
PIGSTY MISSPIGSTY MISS

Build

You can build the RPM / DEB packages for pg_incremental using pig build:

pig build pkg pg_incremental         # build RPM / DEB packages

Install

You can install pg_incremental directly. First, make sure the PGDG and PIGSTY repositories are added and enabled:

pig repo add pgsql -u          # Add repo and update cache

Install the extension using pig or apt/yum/dnf:

pig install pg_incremental;          # Install for current active PG version
pig ext install -y pg_incremental -v 18  # PG 18
pig ext install -y pg_incremental -v 17  # PG 17
pig ext install -y pg_incremental -v 16  # PG 16
dnf install -y pg_incremental_18       # PG 18
dnf install -y pg_incremental_17       # PG 17
dnf install -y pg_incremental_16       # PG 16
apt install -y postgresql-18-pg-incremental   # PG 18
apt install -y postgresql-17-pg-incremental   # PG 17
apt install -y postgresql-16-pg-incremental   # PG 16

Create Extension:

CREATE EXTENSION pg_incremental CASCADE;  -- requires: pg_cron

Usage

pg_incremental: Incremental Data Processing in PostgreSQL

The pg_incremental extension provides fast, reliable incremental batch processing pipelines in PostgreSQL. It defines parameterized queries that execute periodically for new data, ensuring exactly-once delivery.

CREATE EXTENSION pg_incremental CASCADE;  -- depends on pg_cron

Pipeline Types

There are three types of pipelines:

  • Sequence pipelines – Process ranges of sequence values from a table
  • Time interval pipelines – Process time ranges after intervals pass
  • File list pipelines – Process new files from a file listing function

Sequence Pipeline

Create a pipeline that incrementally aggregates new rows using a sequence:

SELECT incremental.create_sequence_pipeline('event-aggregation', 'events', $$
  INSERT INTO events_agg
  SELECT date_trunc('day', event_time), count(*)
  FROM events
  WHERE event_id BETWEEN $1 AND $2
  GROUP BY 1
  ON CONFLICT (day) DO UPDATE SET event_count = events_agg.event_count + excluded.event_count
$$);

$1 and $2 are set to the lowest and highest sequence values that can be safely processed.

With batch size limiting:

SELECT incremental.create_sequence_pipeline(
  'event-aggregation', 'events',
  $$ ... $$,
  schedule := '* * * * *',
  max_batch_size := 10000
);

Time Interval Pipeline

Process data in fixed time intervals:

SELECT incremental.create_time_interval_pipeline('event-aggregation', '1 day', $$
  INSERT INTO events_agg
  SELECT event_time::date, count(distinct event_id)
  FROM events
  WHERE event_time >= $1 AND event_time < $2
  GROUP BY 1
$$);

$1 and $2 are set to the start and end (exclusive) of the time range.

For per-interval execution (e.g., daily exports):

SELECT incremental.create_time_interval_pipeline('event-export',
  time_interval := '1 day',
  batched := false,
  start_time := '2024-11-01',
  command := $$ SELECT export_events($1, $2) $$
);

File List Pipeline

Process new files as they appear:

SELECT incremental.create_file_list_pipeline('event-import', 's3://mybucket/events/*.csv', $$
  SELECT import_events($1)
$$);

Management Functions

FunctionDescription
incremental.execute_pipeline(name)Manually execute a pipeline (only if new data exists)
incremental.reset_pipeline(name)Reset pipeline to reprocess from the beginning
incremental.drop_pipeline(name)Remove a pipeline
incremental.skip_file(pipeline, path)Skip a faulty file in a file list pipeline

Monitoring

SELECT * FROM incremental.sequence_pipelines;
SELECT * FROM incremental.time_interval_pipelines;
SELECT * FROM incremental.processed_files;

Check job outcomes via pg_cron:

SELECT jobname, start_time, status, return_message
FROM cron.job_run_details JOIN cron.job USING (jobid)
WHERE jobname LIKE 'pipeline:%' ORDER BY 1 DESC LIMIT 5;

Last Modified 2026-03-12: add pg extension catalog (95749bf)