No description
Find a file
Matthias Petermann d48c585888 Added test plan
2025-11-27 06:39:33 +01:00
cmd/pulsar-iotbridge Initial import of PoC 2025-11-26 17:22:27 +01:00
internal Initial import of PoC 2025-11-26 17:22:27 +01:00
.gitignore Initial import of PoC 2025-11-26 17:22:27 +01:00
go.mod Initial import of PoC 2025-11-26 17:22:27 +01:00
go.sum Initial import of PoC 2025-11-26 17:22:27 +01:00
LICENSE Initial import of PoC 2025-11-26 17:22:27 +01:00
Makefile Initial import of PoC 2025-11-26 17:22:27 +01:00
README.md Updated Readme 2025-11-27 06:39:03 +01:00
TESTING.MD Added test plan 2025-11-27 06:39:33 +01:00

pulsar-iotbridge

High-performance MQTT → Apache Pulsar IoT bridge, written in Go.

It subscribes to all MQTT topics on a broker and mirrors every incoming message to a Pulsar topic, with:

  • stable, battle-tested client libraries
  • a fixed-size worker pool for publishing
  • a safe, panic-free payload pooling implementation to reduce GC pressure
  • built-in Prometheus metrics on /metrics

1. What this bridge does

At a high level:

  1. Connects to an MQTT broker and subscribes to # (all topics).
  2. For every incoming MQTT message:
    • Copies the payload into a reusable buffer (if ≤ 4 KB) or into a fresh slice.
    • Pushes a small InMessage struct onto an internal buffered channel.
  3. A configurable pool of worker goroutines:
    • Reads from that channel.
    • Maps the MQTT topic to a Pulsar topic.
    • Publishes to Pulsar using a per-topic producer (created lazily and cached).
  4. Exposes internal metrics for Prometheus.

If the queue is full, messages are intentionally dropped and counted.


2. Topic mapping

MQTT topics map 1:1 to Pulsar topics:

persistent://<tenant>/<namespace>/<mqtt-topic>

Example:

  • MQTT: dt/device123/temperature
  • Pulsar: persistent://public/mqtt_ingest/dt/device123/temperature

Regex consumers can subscribe to entire trees:

./pulsar-cli consumer \
  --regex \
  -t "persistent://public/mqtt_ingest/dt/.*/.*" \
  -s test

3. Features at a glance

Streaming

  • Subscribes to MQTT #
  • QoS 0 or 1 supported
  • 1:1 MQTT → Pulsar topic mirroring

Resource-safe design

  • 100k-message bounded internal queue
  • Worker-pool based Pulsar publishing
  • Overload = drop messages, dont crash

Efficient Pulsar producers

  • Cached producers per topic
  • Batching (up to 1000 msgs, ~5ms delay)
  • LZ4 compression
  • Large pending buffer (100k)

Memory pooling

  • Messages ≤ 4 KiB use pooled buffers
  • Larger messages allocate new slices
  • Strict, panic-free pooling logic

Metrics

Exposes per-topic:

  • MQTT messages received
  • Pulsar messages sent
  • In-flight messages
  • Pulsar send errors
  • Dropped messages

Graceful shutdown

  • Closes MQTT
  • Closes Pulsar client and all producers
  • Drains internal channel

4. Build and Run

Prerequisites

  • Go 1.22+
  • MQTT broker
  • Pulsar cluster or standalone

Build

make build

Quick start

./pulsar-iotbridge run \
  --mqtt-url tcp://localhost:1883 \
  --mqtt-client-id pulsar-iotbridge \
  --mqtt-qos 0 \
  --pulsar-url pulsar://localhost:6650 \
  --pulsar-tenant public \
  --pulsar-namespace mqtt_ingest \
  --workers 8 \
  --metrics-addr :9090 \
  --log-level info

5. Configuration Reference

Global flags

Flag Default Description
--log-level info Log level

MQTT flags

Flag Default Description
--mqtt-url tcp://localhost:1883 Broker address
--mqtt-client-id pulsar-iotbridge Client ID
--mqtt-qos 0 QoS 0 or 1
--mqtt-username "" Optional
--mqtt-password "" Optional

Pulsar flags

Flag Default Description
--pulsar-url pulsar://localhost:6650 Pulsar service URL
--pulsar-token "" Auth token
--pulsar-tenant public Tenant
--pulsar-namespace mqtt_ingest Namespace

Runtime flags

Flag Default Description
--workers 8 Worker goroutines
--metrics-addr :9090 Metrics endpoint

6. Backpressure and Dropping

The internal channel:

inCh := make(chan *InMessage, 100_000)

Incoming MQTT messages:

  • Are copied into pooled/static buffers
  • Attempt to enqueue via non-blocking select
  • On full queue → drop + metric increment

Worker goroutines:

  • Pull from channel
  • Resolve/create Pulsar producer
  • Send via SendAsync
  • Track metrics
  • Return buffers to pools

Throughput stays high, memory stays bounded.


7. Metrics

Metrics available on /metrics (Prometheus):

  • mqtt_messages_total{topic=...}
  • pulsar_messages_total{topic=...}
  • pulsar_errors_total{topic=...}
  • inflight_messages{topic=...}
  • dropped_messages_total{topic=...}

Prometheus example:

scrape_configs:
  - job_name: 'pulsar-iotbridge'
    static_configs:
      - targets: ['localhost:9090']

8. Operational tips

  • Start MQTT and Pulsar before the bridge
  • Scale workers if Pulsar throughput is bottleneck
  • Watch “pending” and “dropped” metrics
  • SIGINT/SIGTERM triggers full graceful shutdown

9. License

MIT License © 2025 Matthias Petermann