A lightweight Go service that watches directories and streams completed files into Apache Pulsar topics. Designed for reliable file-to-event pipelines at the edge or in data ingestion systems.
Find a file
Matthias Petermann b2cb84df44 Update README.md
2025-11-16 07:47:55 +01:00
.gitignore Implement PoC 2025-11-11 00:58:46 +01:00
go.mod Implement PoC 2025-11-11 00:58:46 +01:00
go.sum Implement PoC 2025-11-11 00:58:46 +01:00
LICENSE Added license file 2025-11-11 01:01:22 +01:00
main.go Implement PoC 2025-11-11 00:58:46 +01:00
Makefile Implement PoC 2025-11-11 00:58:46 +01:00
README.md Update README.md 2025-11-16 07:47:55 +01:00

🪶 filecast

filecast is a lightweight Go daemon that watches one or more directories for new or modified files and streams completed files into Apache Pulsar topics. Its designed for high-throughput ingest pipelines, data lakes, and edge systems where file-based workflows need to be published as event streams.


Features

  • 🕵️ Directory watcher using inotify / kqueue (github.com/rjeczalik/notify)
  • 🔄 Recursive mode for nested directory trees
  • 📡 Direct Pulsar producer per file, with optional chunked messages for large payloads
  • 🔐 JWT / token authentication via environment variables
  • 🧾 Message properties automatically include file path, size, hash, and name
  • 💾 xattr tagging (user.published and user.pulsar-final-msgid) after successful publication
  • ⚙️ Configurable base topic per namespace and tenant
  • 🧩 Command-line interface built with Cobra
  • 🪣 Stateless by design, suitable for container deployment or batch-processing

🚀 Quick Start

1. Build

go build -o filecast .

2. Set up environment

export PULSAR_URL=pulsar://localhost:6650
export PULSAR_JWT="your-jwt-token"   # optional if authentication is enabled

3. Run the watcher

./filecast run   --watch /data/incoming   --recursive   --tenant public   --namespace default   --topic-base files   --property env=prod   --property source=edge01

Whenever a file is closed after writing or moved into the watched directory, filecast will:

  1. Compute its SHA-256 checksum
  2. Publish the file content as a binary payload to Pulsar
  3. Set extended attributes:
    • user.published=true
    • user.pulsar-final-msgid=<pulsar message id>

🧠 Topic Naming

Each file is published to its own topic:

persistent://<tenant>/<namespace>/<topic-base>/<filename>.stream

Example:

persistent://public/default/files/report_2025-11-10.csv.stream

⚙️ Options

Flag Description
--watch, -w One or more directories to watch (repeatable)
--recursive Watch directories recursively
--tenant Pulsar tenant (default: public)
--namespace Pulsar namespace (default: default)
--topic-base Base topic prefix (default: files)
--enable-chunking, -c Enable Pulsar native message chunking
--xattr Set extended attributes on success (default: true)
--property, -p Custom message property in key=value format (repeatable)

🧩 Message Properties

Each published Pulsar message automatically includes:

Property Example Description
file.path /data/incoming/report.csv Full file path
file.name report.csv Base file name
file.size 24576 File size in bytes
file.sha256 4a8b6e… SHA-256 checksum
stream.version 1 Version marker
user.* Any custom --property values

🔐 Authentication

If PULSAR_JWT is set, the client uses token authentication:

export PULSAR_JWT=$(cat ~/.pulsar/token.jwt)

Otherwise, it connects anonymously to the configured PULSAR_URL.


🪶 Example Logs

INFO[0001] Watching recursively: /data/incoming
INFO[0003] published file
       topic=persistent://public/default/files/report.csv.stream
       nativeChunking=false
       finalMsgID="000001-0001-00000001"

🧱 Extended Attributes

After successful publish:

getfattr -d /data/incoming/report.csv
# file: report.csv
user.published="true"
user.pulsar-final-msgid="000001-0001-00000001"

These xattrs can be used by other processes to detect upload status.


🧰 Version

./filecast version
dev

📦 Dependencies


🧩 License

MIT License © 2025 Matthias Petermann