kafka_fdw

kafka Foreign Data Wrapper for CSV formatted messages

Overview

PackageVersionCategoryLicenseLanguage
kafka_fdw0.0.3FDWPostgreSQLC
IDExtensionBinLibLoadCreateTrustRelocSchema
8730kafka_fdwNoYesNoYesNoYes-
Relatedpgmq mongo_fdw redis_fdw wrappers multicorn redis hdfs_fdw wal2json

Version

TypeRepoVersionPG VerPackageDeps
EXTPIGSTY0.0.31817161514kafka_fdw-
RPMPIGSTY0.0.31817161514kafka_fdw_$v-
DEBPIGSTY0.0.31817161514postgresql-$v-kafka-fdw-
OS / PGPG18PG17PG16PG15PG14
el8.x86_64
el8.aarch64
el9.x86_64
el9.aarch64
el10.x86_64
el10.aarch64
d12.x86_64
d12.aarch64
d13.x86_64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
d13.aarch64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u22.x86_64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u22.aarch64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u24.x86_64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
u24.aarch64
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3
PIGSTY 0.0.3

Build

You can build the RPM / DEB packages for kafka_fdw using pig build:

pig build pkg kafka_fdw         # build RPM / DEB packages

Install

You can install kafka_fdw directly. First, make sure the PGDG and PIGSTY repositories are added and enabled:

pig repo add pgsql -u          # Add repo and update cache

Install the extension using pig or apt/yum/dnf:

pig install kafka_fdw;          # Install for current active PG version
pig ext install -y kafka_fdw -v 18  # PG 18
pig ext install -y kafka_fdw -v 17  # PG 17
pig ext install -y kafka_fdw -v 16  # PG 16
pig ext install -y kafka_fdw -v 15  # PG 15
pig ext install -y kafka_fdw -v 14  # PG 14
dnf install -y kafka_fdw_18       # PG 18
dnf install -y kafka_fdw_17       # PG 17
dnf install -y kafka_fdw_16       # PG 16
dnf install -y kafka_fdw_15       # PG 15
dnf install -y kafka_fdw_14       # PG 14
apt install -y postgresql-18-kafka-fdw   # PG 18
apt install -y postgresql-17-kafka-fdw   # PG 17
apt install -y postgresql-16-kafka-fdw   # PG 16
apt install -y postgresql-15-kafka-fdw   # PG 15
apt install -y postgresql-14-kafka-fdw   # PG 14

Create Extension:

CREATE EXTENSION kafka_fdw;

Usage

kafka_fdw: Kafka Foreign Data Wrapper for CSV formatted messages

Create Server

CREATE EXTENSION kafka_fdw;

CREATE SERVER kafka_server FOREIGN DATA WRAPPER kafka_fdw
  OPTIONS (brokers 'localhost:9092');

Server Options: brokers (required, comma-separated Kafka broker endpoints).

Create User Mapping

CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;

Create Foreign Table (CSV Format)

CREATE FOREIGN TABLE kafka_csv (
  part int OPTIONS (partition 'true'),
  offs bigint OPTIONS (offset 'true'),
  some_int int,
  some_text text,
  some_date date,
  some_time timestamp
)
SERVER kafka_server
OPTIONS (format 'csv', topic 'my_topic', batch_size '30', buffer_delay '100');

Two metadata columns are required: one with partition 'true' and one with offset 'true'. The remaining columns match the message format.

Table Options: format (csv or json), topic (Kafka topic name), batch_size, buffer_delay (milliseconds), strict (enforce strict schema validation), ignore_junk (set malformed columns to NULL).

Create Foreign Table (JSON Format)

CREATE FOREIGN TABLE kafka_json (
  part int OPTIONS (partition 'true'),
  offs bigint OPTIONS (offset 'true'),
  some_int int OPTIONS (json 'int_val'),
  some_text text OPTIONS (json 'text_val')
)
SERVER kafka_server
OPTIONS (format 'json', topic 'my_json_topic', batch_size '30', buffer_delay '100');

Use the json column option to map column names to JSON keys.

Consuming Messages

-- Read from a specific partition and offset
SELECT * FROM kafka_csv WHERE part = 0 AND offs > 1000 LIMIT 60;

-- Read from multiple partitions
SELECT * FROM kafka_csv
WHERE (part = 0 AND offs > 100) OR (part = 1 AND offs > 300);

Note: The offset keyword is reserved in SQL; use double quotes when referencing the offset column in some contexts.

Producing Messages

-- Insert with explicit partition
INSERT INTO kafka_csv (part, some_int, some_text)
  VALUES (0, 42, 'hello from partition 0');

-- Insert with auto-partition selection
INSERT INTO kafka_csv (some_int, some_text)
  VALUES (42, 'auto-partitioned message');

Last Modified 2026-03-12: add pg extension catalog (95749bf)