pgq
Generic queue for PostgreSQL
Overview
| Package | Version | Category | License | Language |
|---|---|---|---|---|
pgq | 3.5.1 | FEAT | ISC | C |
| ID | Extension | Bin | Lib | Load | Create | Trust | Reloc | Schema |
|---|---|---|---|---|---|---|---|---|
| 2890 | pgq | No | Yes | No | Yes | No | No | pg_catalog |
| Related | age hll rum pg_graphql pg_jsonschema jsquery pg_hint_plan hypopg |
|---|
Version
| Type | Repo | Version | PG Ver | Package | Deps |
|---|---|---|---|---|---|
| EXT | PGDG | 3.5.1 | 1817161514 | pgq | - |
| RPM | PGDG | 3.5.1 | 1817161514 | pgq_$v | - |
| DEB | PGDG | 3.5.1 | 1817161514 | postgresql-$v-pgq3 | - |
Install
You can install pgq directly. First, make sure the PGDG repository is added and enabled:
pig repo add pgdg -u # Add PGDG repo and update cache
Install the extension using pig or apt/yum/dnf:
pig install pgq; # Install for current active PG version
pig ext install -y pgq -v 18 # PG 18
pig ext install -y pgq -v 17 # PG 17
pig ext install -y pgq -v 16 # PG 16
pig ext install -y pgq -v 15 # PG 15
pig ext install -y pgq -v 14 # PG 14
dnf install -y pgq_18 # PG 18
dnf install -y pgq_17 # PG 17
dnf install -y pgq_16 # PG 16
dnf install -y pgq_15 # PG 15
dnf install -y pgq_14 # PG 14
apt install -y postgresql-18-pgq3 # PG 18
apt install -y postgresql-17-pgq3 # PG 17
apt install -y postgresql-16-pgq3 # PG 16
apt install -y postgresql-15-pgq3 # PG 15
apt install -y postgresql-14-pgq3 # PG 14
Create Extension:
CREATE EXTENSION pgq;
Usage
PgQ is a PostgreSQL extension that provides a generic, high-performance lockless queue with a simple SQL function API. It uses a producer-consumer model with batch-based event processing.
CREATE EXTENSION pgq;
Core Concepts
- Queue: A named event stream. Events are inserted by producers and consumed in batches.
- Consumer: A named subscriber registered to a queue. Each consumer tracks its own position.
- Batch: A group of events retrieved together. Consumers process events batch by batch.
- Ticker: A background process that creates batch boundaries (ticks) at regular intervals.
Queue Management
-- Create a queue
SELECT pgq.create_queue('myqueue');
-- Drop a queue
SELECT pgq.drop_queue('myqueue');
-- Get queue info
SELECT * FROM pgq.get_queue_info();
SELECT * FROM pgq.get_queue_info('myqueue');
Consumer Registration
-- Register a consumer on a queue
SELECT pgq.register_consumer('myqueue', 'myconsumer');
-- Unregister a consumer
SELECT pgq.unregister_consumer('myqueue', 'myconsumer');
-- Get consumer info
SELECT * FROM pgq.get_consumer_info('myqueue');
Producing Events
-- Insert an event into a queue
SELECT pgq.insert_event('myqueue', 'event_type', 'event_data');
-- Insert with extra fields
SELECT pgq.insert_event('myqueue', 'event_type', 'event_data',
'extra1', 'extra2', 'extra3', 'extra4');
Consuming Events
-- Get the next batch of events (returns batch_id or NULL if no new batches)
SELECT pgq.next_batch('myqueue', 'myconsumer');
-- Get events from the batch
SELECT * FROM pgq.get_batch_events(:batch_id);
-- Retry a failed event (will reappear after the specified interval)
SELECT pgq.event_retry(:batch_id, :event_id, :retry_seconds);
-- Mark batch as done
SELECT pgq.finish_batch(:batch_id);
Typical Consumer Loop
-- 1. Get next batch
SELECT pgq.next_batch('myqueue', 'myconsumer') AS batch_id;
-- 2. If batch_id is not NULL, get events
SELECT * FROM pgq.get_batch_events(:batch_id);
-- 3. Process events, retry failures
SELECT pgq.event_retry(:batch_id, :event_id, 60);
-- 4. Finish the batch
SELECT pgq.finish_batch(:batch_id);
Maintenance
PgQ requires a ticker daemon (pgqd) to run in the background for creating batch boundaries and performing maintenance tasks like table rotation and retry event processing.
Key Functions
| Function | Description |
|---|---|
pgq.create_queue(name) | Create a new queue |
pgq.drop_queue(name) | Remove a queue |
pgq.register_consumer(queue, consumer) | Register a consumer |
pgq.unregister_consumer(queue, consumer) | Unregister a consumer |
pgq.insert_event(queue, type, data, ...) | Insert an event |
pgq.next_batch(queue, consumer) | Get next batch ID |
pgq.get_batch_events(batch_id) | Get events from a batch |
pgq.event_retry(batch_id, event_id, seconds) | Schedule event retry |
pgq.finish_batch(batch_id) | Mark batch as processed |
pgq.get_queue_info([name]) | Get queue statistics |
pgq.get_consumer_info(queue) | Get consumer statistics |
Feedback
Was this page helpful?
Thanks for the feedback! Please let us know how we can improve.
Sorry to hear that. Please let us know how we can improve.