pgmb

A simple PostgreSQL Message Broker system

Overview

PackageVersionCategoryLicenseLanguage
pgmb1.0.0FEATPostgreSQLSQL
IDExtensionBinLibLoadCreateTrustRelocSchema
2870pgmbNoNoNoYesNoNopgmb
Relatedpg_cron http pgmq pgq pg_task pg_cron pg_background pg_later pg_net kafka_fdw

Version

TypeRepoVersionPG VerPackageDeps
EXTPIGSTY1.0.01817161514pgmbpg_cron, http
RPMPIGSTY1.0.01817161514pgmb_$vpg_cron_$v, pg_http_$v
DEBPIGSTY1.0.01817161514postgresql-$v-pgmbpostgresql-$v-cron, postgresql-$v-http
OS / PGPG18PG17PG16PG15PG14
el8.x86_64
el8.aarch64
el9.x86_64
el9.aarch64
el10.x86_64
el10.aarch64
d12.x86_64
d12.aarch64
d13.x86_64
d13.aarch64
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
u22.x86_64
u22.aarch64
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
u24.x86_64
u24.aarch64
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0
PIGSTY 1.0.0

Build

You can build the RPM / DEB packages for pgmb using pig build:

pig build pkg pgmb         # build RPM / DEB packages

Install

You can install pgmb directly. First, make sure the PGDG and PIGSTY repositories are added and enabled:

pig repo add pgsql -u          # Add repo and update cache

Install the extension using pig or apt/yum/dnf:

pig install pgmb;          # Install for current active PG version
pig ext install -y pgmb -v 18  # PG 18
pig ext install -y pgmb -v 17  # PG 17
pig ext install -y pgmb -v 16  # PG 16
pig ext install -y pgmb -v 15  # PG 15
pig ext install -y pgmb -v 14  # PG 14
dnf install -y pgmb_18       # PG 18
dnf install -y pgmb_17       # PG 17
dnf install -y pgmb_16       # PG 16
dnf install -y pgmb_15       # PG 15
dnf install -y pgmb_14       # PG 14
apt install -y postgresql-18-pgmb   # PG 18
apt install -y postgresql-17-pgmb   # PG 17
apt install -y postgresql-16-pgmb   # PG 16
apt install -y postgresql-15-pgmb   # PG 15
apt install -y postgresql-14-pgmb   # PG 14

Create Extension:

CREATE EXTENSION pgmb CASCADE;  -- requires: pg_cron, http

Usage

pgmb: A lightweight message broker system built inside PostgreSQL

The pgmb extension provides an in-database message broker with HTTP-based worker dispatch, automatic retries, dead letter queues, and pattern-based routing.

CREATE EXTENSION pgmb;  -- requires pg_cron and http extensions

Register a Worker

SELECT pgmb.worker(
    'order_processor',                     -- worker name
    'http://localhost:8080/process',       -- endpoint URL
    100                                    -- requests per second limit
);
-- Returns: worker UUID

Create a Queue

SELECT pgmb.create(
    'order_queue',                         -- queue name
    'order.*',                             -- binding key pattern (supports * wildcard)
    5,                                     -- max retries
    '550e8400-e29b-41d4-a716-446655440000' -- worker UUID
);
-- Returns: queue UUID

Send Messages

-- Simple message
SELECT pgmb.send(
    gen_random_uuid(),
    'order.created',
    '{"order_id": 123, "amount": 45.67}'::jsonb
);

-- With custom headers
SELECT pgmb.send(
    gen_random_uuid(),
    'order.created',
    '{"order_id": 123}'::jsonb,
    '{"source": "web", "priority": "high"}'::jsonb
);

-- Delayed message (by timestamp or seconds)
SELECT pgmb.send(
    gen_random_uuid(),
    'order.created',
    '{"order_id": 123}'::jsonb,
    '{}'::jsonb,
    now() + interval '10 minutes'
);

API Reference

FunctionDescription
pgmb.worker(name, endpoint, rps)Register an HTTP worker endpoint
pgmb.create(name, binding_key, max_retries, worker_id)Create a queue with routing pattern
pgmb.send(id, routing_key, body)Send a message
pgmb.send(id, routing_key, body, headers)Send a message with headers
pgmb.send(id, routing_key, body, headers, delay)Send a delayed message

How It Works

  1. Messages are inserted into pgmb.messages via pgmb.send()
  2. A trigger routes messages to matching queues based on routing key patterns
  3. pg_cron dispatches messages via HTTP POST to worker endpoints every second
  4. Failed messages are retried; after max retries they move to a dead letter queue

Monitoring

SELECT * FROM pgmb.workers;
SELECT * FROM pgmb.queues;
SELECT COUNT(*) FROM pgmb.order_queue WHERE acknoledge = false;
SELECT * FROM pgmb.order_dead_letter_queue;

Last Modified 2026-03-12: add pg extension catalog (95749bf)