pgq

Generic queue for PostgreSQL

Overview

PackageVersionCategoryLicenseLanguage
pgq3.5.1FEATISCC
IDExtensionBinLibLoadCreateTrustRelocSchema
2890pgqNoYesNoYesNoNopg_catalog
Relatedage hll rum pg_graphql pg_jsonschema jsquery pg_hint_plan hypopg

Version

TypeRepoVersionPG VerPackageDeps
EXTPGDG3.5.11817161514pgq-
RPMPGDG3.5.11817161514pgq_$v-
DEBPGDG3.5.11817161514postgresql-$v-pgq3-
OS / PGPG18PG17PG16PG15PG14
el8.x86_64
el8.aarch64
el9.x86_64
el9.aarch64
el10.x86_64
el10.aarch64
d12.x86_64
d12.aarch64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
d13.x86_64
d13.aarch64
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
PGDG 3.5.1
u22.x86_64
u22.aarch64
u24.x86_64
u24.aarch64

Install

You can install pgq directly. First, make sure the PGDG repository is added and enabled:

pig repo add pgdg -u          # Add PGDG repo and update cache

Install the extension using pig or apt/yum/dnf:

pig install pgq;          # Install for current active PG version
pig ext install -y pgq -v 18  # PG 18
pig ext install -y pgq -v 17  # PG 17
pig ext install -y pgq -v 16  # PG 16
pig ext install -y pgq -v 15  # PG 15
pig ext install -y pgq -v 14  # PG 14
dnf install -y pgq_18       # PG 18
dnf install -y pgq_17       # PG 17
dnf install -y pgq_16       # PG 16
dnf install -y pgq_15       # PG 15
dnf install -y pgq_14       # PG 14
apt install -y postgresql-18-pgq3   # PG 18
apt install -y postgresql-17-pgq3   # PG 17
apt install -y postgresql-16-pgq3   # PG 16
apt install -y postgresql-15-pgq3   # PG 15
apt install -y postgresql-14-pgq3   # PG 14

Create Extension:

CREATE EXTENSION pgq;

Usage

pgq: Generic high-performance lockless queue for PostgreSQL

PgQ is a PostgreSQL extension that provides a generic, high-performance lockless queue with a simple SQL function API. It uses a producer-consumer model with batch-based event processing.

CREATE EXTENSION pgq;

Core Concepts

  • Queue: A named event stream. Events are inserted by producers and consumed in batches.
  • Consumer: A named subscriber registered to a queue. Each consumer tracks its own position.
  • Batch: A group of events retrieved together. Consumers process events batch by batch.
  • Ticker: A background process that creates batch boundaries (ticks) at regular intervals.

Queue Management

-- Create a queue
SELECT pgq.create_queue('myqueue');

-- Drop a queue
SELECT pgq.drop_queue('myqueue');

-- Get queue info
SELECT * FROM pgq.get_queue_info();
SELECT * FROM pgq.get_queue_info('myqueue');

Consumer Registration

-- Register a consumer on a queue
SELECT pgq.register_consumer('myqueue', 'myconsumer');

-- Unregister a consumer
SELECT pgq.unregister_consumer('myqueue', 'myconsumer');

-- Get consumer info
SELECT * FROM pgq.get_consumer_info('myqueue');

Producing Events

-- Insert an event into a queue
SELECT pgq.insert_event('myqueue', 'event_type', 'event_data');

-- Insert with extra fields
SELECT pgq.insert_event('myqueue', 'event_type', 'event_data',
                         'extra1', 'extra2', 'extra3', 'extra4');

Consuming Events

-- Get the next batch of events (returns batch_id or NULL if no new batches)
SELECT pgq.next_batch('myqueue', 'myconsumer');

-- Get events from the batch
SELECT * FROM pgq.get_batch_events(:batch_id);

-- Retry a failed event (will reappear after the specified interval)
SELECT pgq.event_retry(:batch_id, :event_id, :retry_seconds);

-- Mark batch as done
SELECT pgq.finish_batch(:batch_id);

Typical Consumer Loop

-- 1. Get next batch
SELECT pgq.next_batch('myqueue', 'myconsumer') AS batch_id;

-- 2. If batch_id is not NULL, get events
SELECT * FROM pgq.get_batch_events(:batch_id);

-- 3. Process events, retry failures
SELECT pgq.event_retry(:batch_id, :event_id, 60);

-- 4. Finish the batch
SELECT pgq.finish_batch(:batch_id);

Maintenance

PgQ requires a ticker daemon (pgqd) to run in the background for creating batch boundaries and performing maintenance tasks like table rotation and retry event processing.

Key Functions

FunctionDescription
pgq.create_queue(name)Create a new queue
pgq.drop_queue(name)Remove a queue
pgq.register_consumer(queue, consumer)Register a consumer
pgq.unregister_consumer(queue, consumer)Unregister a consumer
pgq.insert_event(queue, type, data, ...)Insert an event
pgq.next_batch(queue, consumer)Get next batch ID
pgq.get_batch_events(batch_id)Get events from a batch
pgq.event_retry(batch_id, event_id, seconds)Schedule event retry
pgq.finish_batch(batch_id)Mark batch as processed
pgq.get_queue_info([name])Get queue statistics
pgq.get_consumer_info(queue)Get consumer statistics

Last Modified 2026-03-12: add pg extension catalog (95749bf)