pulse-events
Audience
PLANA staff. The event bus that ties every PLANA service together.
pulse-events is PLANA's event bus: a thin REST API over a Redis Stream that producers publish CloudEvents to and consumer groups read from. It is the spine of cross-service communication — provisioning signals, subscription state changes, agent reports, all flow here.
Stack
| Property | Value |
|---|---|
| Repo | git.planapulse.com/plana-pulse/pulse-events |
| Language | JavaScript ESM |
| Framework | Fastify v5 |
| Backing store | Redis Streams (Valkey 7) |
| Event format | CloudEvents 1.0 |
| Test coverage | 100% |
| Image base | node:20-alpine |
| Namespace | pulse-events |
| Port | 3001 |
| Redis key | PLANA:events, MAXLEN ~50000 |
The contract
PLANA's services publish and consume events on a single Redis Stream called PLANA:events. Every event is a CloudEvents 1.0 envelope:
{
"specversion": "1.0",
"id": "01H8…",
"type": "client.provisioned",
"source": "//plana-pulse/crossplane/planaclient",
"subject": "acme",
"time": "2026-05-29T08:23:14Z",
"datacontenttype": "application/json",
"data": {
"subdomain": "acme",
"projectId": 23,
"tier": "pro",
"odooVersion": "18",
"ownerEmail": "ceo@acme.bg"
}
}The bus is append-only. There is no retraction or update — every event is immutable. Errors and corrections flow as new events with their own type (e.g. client.provisioning.failed).
API surface
POST /api/v1/events publish a CloudEvent → Redis XADD
POST /api/v1/events/groups/:group create / ensure a consumer group
GET /api/v1/events/info stream stats (length, last-id, groups)
GET /api/v1/status health summary
GET /healthz liveness
GET /readyz readiness (Redis reachable)Authentication: X-API-Key header. Each producer has its own key in SOPS:
| Producer | SOPS path |
|---|---|
pulse-account-api | pulse_events.publisher_key |
pulse-billing | same |
| Crossplane Jobs | same |
pulse-banking | same |
A future hardening pass will split the publisher key per producer. Today, one shared publisher key + NetworkPolicies limits which pods can reach pulse-events:3001.
Event types in flight
| Type | Producer | Consumers |
|---|---|---|
client.provisioned | Crossplane PLANAClient Composition | pulse-notifications (welcome email), pulse-account-api (account-portal cache invalidate) |
client.provisioning.failed | same | pulse-notifications (error to PLANA staff) |
subscription.activated | pulse-billing | pulse-account-api, pulse-notifications |
subscription.cancelled | pulse-billing | same |
invoice.paid | pulse-billing (Stripe webhook receiver) | pulse-account-api, pulse-notifications |
deployment.ready | Forgejo CI on docs-portal deploy | none yet (will feed status dashboard) |
agent.report.ready | ai-agents | pulse-account-api (push to BOS via SSE) |
agent.tool_call.executed | ai-agents | pulse-account-api (execution log) |
tenant.upgraded | Crossplane TenantUpgrade Composition | pulse-notifications |
tenant.backup.completed | per-tenant backup CronJob | pulse-account-api (status badge) |
The list is append-only: we add new types, we don't change the schema of an existing type without a migration plan.
Consumer groups
Each consumer service has its own consumer group:
PLANA:events → consumers:
- group "pulse-notifications" (welcome emails, alerts)
- group "pulse-account-api" (cache invalidation, push to SSE)
- group "pulse-billing-status" (status mirror)
- group "pulse-analytics" (BI ingestion)Consumers commit their cursor inside Redis via standard XACK. A consumer that falls behind reads from where it left off; we do not lose events as long as the stream's MAXLEN ~50000 hasn't trimmed past the unacked entries.
Why Redis Streams, not Kafka
| Criterion | Redis Streams | Kafka |
|---|---|---|
| Volume today | ~500–2000 events/day | (would handle 10⁶+) |
| Operational complexity | Zero (Redis already deployed) | Significant (Zookeeper-free Kafka still adds 3 brokers) |
| Replay window | MAXLEN ~50,000 (≈ 30 days at our rate) | Configurable retention |
| At-least-once delivery | Yes, via XACK | Yes |
| Order guarantee | Per-stream | Per-partition |
We will revisit Kafka when PLANA:events exceeds ~1M events/day. We are ~3 orders of magnitude away.
Configuration
| Env var | Purpose |
|---|---|
REDIS_URL | redis://:<password>@redis.redis:6379/0 |
STREAM_KEY | PLANA:events |
STREAM_MAXLEN | ~50000 (approximate, with XADD MAXLEN ~) |
PUBLISHER_API_KEY | accepts requests with this X-API-Key |
NetworkPolicies
- Ingress: only pods in
pulse-account,ai-bos-agent,pulse-billing,crossplane-system,pulse-bankingcan reachpulse-events:3001 - Egress:
pulse-eventscan reachredis.redis:6379only
A compromise of pulse-events cannot reach the cluster's other services laterally.
Operational tasks
See recent events
kubectl -n pulse-events exec deploy/pulse-events -- \
redis-cli -h redis.redis -a "$REDIS_PASSWORD" \
XREVRANGE PLANA:events + - COUNT 20 | head -100Check consumer lag
kubectl -n pulse-events exec deploy/pulse-events -- \
redis-cli -h redis.redis -a "$REDIS_PASSWORD" \
XINFO CONSUMERS PLANA:events pulse-notificationsThe pending column shows unacked entries; a growing pending count means the consumer is lagging.
Force-replay events from a specific point
If a consumer needs to re-process events from a known good point:
# Recreate the group at a specific entry id
kubectl -n pulse-events exec deploy/pulse-events -- \
redis-cli -h redis.redis -a "$REDIS_PASSWORD" \
XGROUP CREATE PLANA:events pulse-notifications-replay <id> MKSTREAMThe replay group consumes from the historical point; the original group keeps its position.
Add a new event type
- Document the type in the table above (this page)
- Add a Zod / JSON Schema in
pulse-events/schemas/<type>.js(we validate event payloads at publish time) - Producer publishes a sample event in a feature branch
- Consumer adds handling for the new type
- Merge to main; the new type flows in production
Health
GET /readyz performs a Redis PING and returns 503 if Redis is unreachable. Alertmanager pages if /readyz returns 503 for 1 minute.
Where to read more
- pulse-account-api — primary consumer for BOS-facing events
- Architecture → Data stores — Redis capacity and persistence policy
- Crossplane — emits
client.provisionedand friends - Source:
infra/k8s/pulse-events/