kafka_fdw
Overview
| Package | Version | Category | License | Language |
|---|---|---|---|---|
kafka_fdw | 0.0.3 | FDW | PostgreSQL | C |
| ID | Extension | Bin | Lib | Load | Create | Trust | Reloc | Schema |
|---|---|---|---|---|---|---|---|---|
| 8730 | kafka_fdw | No | Yes | No | Yes | No | Yes | - |
| Related | pgmq mongo_fdw redis_fdw wrappers multicorn redis hdfs_fdw wal2json |
|---|
Version
| Type | Repo | Version | PG Ver | Package | Deps |
|---|---|---|---|---|---|
| EXT | PIGSTY | 0.0.3 | 1817161514 | kafka_fdw | - |
| RPM | PIGSTY | 0.0.3 | 1817161514 | kafka_fdw_$v | - |
| DEB | PIGSTY | 0.0.3 | 1817161514 | postgresql-$v-kafka-fdw | - |
Build
You can build the RPM / DEB packages for kafka_fdw using pig build:
pig build pkg kafka_fdw # build RPM / DEB packages
Install
You can install kafka_fdw directly. First, make sure the PGDG and PIGSTY repositories are added and enabled:
pig repo add pgsql -u # Add repo and update cache
Install the extension using pig or apt/yum/dnf:
pig install kafka_fdw; # Install for current active PG version
pig ext install -y kafka_fdw -v 18 # PG 18
pig ext install -y kafka_fdw -v 17 # PG 17
pig ext install -y kafka_fdw -v 16 # PG 16
pig ext install -y kafka_fdw -v 15 # PG 15
pig ext install -y kafka_fdw -v 14 # PG 14
dnf install -y kafka_fdw_18 # PG 18
dnf install -y kafka_fdw_17 # PG 17
dnf install -y kafka_fdw_16 # PG 16
dnf install -y kafka_fdw_15 # PG 15
dnf install -y kafka_fdw_14 # PG 14
apt install -y postgresql-18-kafka-fdw # PG 18
apt install -y postgresql-17-kafka-fdw # PG 17
apt install -y postgresql-16-kafka-fdw # PG 16
apt install -y postgresql-15-kafka-fdw # PG 15
apt install -y postgresql-14-kafka-fdw # PG 14
Create Extension:
CREATE EXTENSION kafka_fdw;
Usage
kafka_fdw: Kafka Foreign Data Wrapper for CSV formatted messages
Create Server
CREATE EXTENSION kafka_fdw;
CREATE SERVER kafka_server FOREIGN DATA WRAPPER kafka_fdw
OPTIONS (brokers 'localhost:9092');
Server Options: brokers (required, comma-separated Kafka broker endpoints).
Create User Mapping
CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;
Create Foreign Table (CSV Format)
CREATE FOREIGN TABLE kafka_csv (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
some_int int,
some_text text,
some_date date,
some_time timestamp
)
SERVER kafka_server
OPTIONS (format 'csv', topic 'my_topic', batch_size '30', buffer_delay '100');
Two metadata columns are required: one with partition 'true' and one with offset 'true'. The remaining columns match the message format.
Table Options: format (csv or json), topic (Kafka topic name), batch_size, buffer_delay (milliseconds), strict (enforce strict schema validation), ignore_junk (set malformed columns to NULL).
Create Foreign Table (JSON Format)
CREATE FOREIGN TABLE kafka_json (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
some_int int OPTIONS (json 'int_val'),
some_text text OPTIONS (json 'text_val')
)
SERVER kafka_server
OPTIONS (format 'json', topic 'my_json_topic', batch_size '30', buffer_delay '100');
Use the json column option to map column names to JSON keys.
Consuming Messages
-- Read from a specific partition and offset
SELECT * FROM kafka_csv WHERE part = 0 AND offs > 1000 LIMIT 60;
-- Read from multiple partitions
SELECT * FROM kafka_csv
WHERE (part = 0 AND offs > 100) OR (part = 1 AND offs > 300);
Note: The offset keyword is reserved in SQL; use double quotes when referencing the offset column in some contexts.
Producing Messages
-- Insert with explicit partition
INSERT INTO kafka_csv (part, some_int, some_text)
VALUES (0, 42, 'hello from partition 0');
-- Insert with auto-partition selection
INSERT INTO kafka_csv (some_int, some_text)
VALUES (42, 'auto-partitioned message');
Feedback
Was this page helpful?
Thanks for the feedback! Please let us know how we can improve.
Sorry to hear that. Please let us know how we can improve.