fix(sme): wire tenant + billing event dispatchers to NATS (was Redpanda-only, blocking convergence)
The tenant + billing services hardcoded a franz-go Kafka publisher
pointing at REDPANDA_BROKERS. On Sovereigns there is NO Redpanda in
cluster — only NATS JetStream at
nats-jetstream.nats-system.svc.cluster.local:4222 — so every
tenant.created / tenant.deleted / order.placed event was silently
dropped, blocking provisioning + downstream consumers and stalling
the convergence chain end to end.
Per ADR-0001 §6 the canonical event bus is NATS JetStream with
subject convention `catalyst.<domain>.<event>`. This change:
- Adds events.BrokerPublisher + events.MultiPublisher that fan out
to NATS (`catalyst.<event.Type>` derived from Event.Type) and the
legacy Redpanda topic in one call. Either transport may be nil;
the constructor refuses to build a no-op publisher (the exact
silent-failure mode we just hit).
- Adds NATSConn.PublishEvent so the generic Event envelope can flow
over the same JetStream connection used for the metering
subscriber (#798), with Event.ID as the JetStream Msg-Id for
broker-side de-dup.
- Updates tenant + billing main.go to read NATS_URL +
REDPANDA_BROKERS independently, construct the appropriate
transports, and wire MultiPublisher into the Handler. Legacy
Kafka consumers only start when REDPANDA_BROKERS is non-empty
so the pods no longer crashloop dialling localhost:9092 on
Sovereigns.
- Updates chart templates to inject NATS_URL into both tenant and
billing Deployments. ConfigMap default for NATS_URL on Sovereigns
is nats://nats-jetstream.nats-system.svc.cluster.local:4222
(fixes the existing bug where defaults pointed at the wrong
namespace `nats-jetstream` — NATS actually lives in `nats-system`
per clusters/_template/bootstrap-kit/07-nats-jetstream.yaml).
- Sovereign default of REDPANDA_BROKERS is now empty (was the wrong
NATS URL stuffed into a Kafka env, which made franz-go fail every
dial).
Subject mapping per CanonicalSubject:
tenant.created → catalyst.tenant.created
tenant.deleted → catalyst.tenant.deleted
tenant.app_install_requested → catalyst.tenant.app_install_requested
order.placed → catalyst.billing.order.placed
Test:
go build ./... in shared/, tenant/, billing/ (clean)
go test ./events/... ./handlers/... in all three (existing + new
bridge_test.go pass)
helm template with global.sovereignFQDN set renders NATS_URL in
both Deployments + REDPANDA_BROKERS="" in ConfigMap
helm template without global.sovereignFQDN renders the legacy
Redpanda broker (Catalyst-Zero contabo path remains intact)
NATS-side consumers for sme.tenant.events / sme.provision.events ship
in a follow-up PR per the ADR-0001 §6 migration plan; this PR only
unblocks the publish leg which is the immediate convergence blocker.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
89a39ceb76
commit
5039444574
@ -26,8 +26,14 @@ import (
|
||||
|
||||
// Handler holds dependencies for billing HTTP handlers.
|
||||
type Handler struct {
|
||||
Store *store.Store
|
||||
Producer *events.Producer
|
||||
Store *store.Store
|
||||
// Producer is the broker publisher used to emit billing lifecycle
|
||||
// events (order.placed, payment events). Type is the BrokerPublisher
|
||||
// interface so main.go can wire a MultiPublisher (NATS + Redpanda)
|
||||
// per ADR-0001 §6 — see core/services/shared/events/bridge.go.
|
||||
// On Sovereigns the NATS leg is the authoritative path; the
|
||||
// Redpanda leg is intentionally nil because no Redpanda exists.
|
||||
Producer events.BrokerPublisher
|
||||
SuccessURL string
|
||||
CancelURL string
|
||||
CatalogURL string // internal URL to catalog service, e.g. http://catalog.sme.svc.cluster.local:8082
|
||||
|
||||
@ -18,7 +18,10 @@ import (
|
||||
|
||||
func main() {
|
||||
databaseURL := getEnv("DATABASE_URL", "postgres://billing:billing@localhost:5432/billing?sslmode=disable")
|
||||
redpandaBrokers := strings.Split(getEnv("REDPANDA_BROKERS", "localhost:9092"), ",")
|
||||
// REDPANDA_BROKERS — legacy Kafka-protocol bus. Empty on Sovereigns
|
||||
// (no Redpanda exists in cluster); populated on Catalyst-Zero for
|
||||
// backward-compatibility with not-yet-migrated consumers.
|
||||
redpandaBrokersRaw := getEnv("REDPANDA_BROKERS", "")
|
||||
jwtSecret := []byte(getEnv("JWT_SECRET", ""))
|
||||
corsOrigin := getEnv("CORS_ORIGIN", "*")
|
||||
port := getEnv("PORT", "8085")
|
||||
@ -35,12 +38,15 @@ func main() {
|
||||
// hardcoded; the chart pipes it from `billing.sovereignFQDN`. Empty is
|
||||
// tolerated for dev loops — the template emits a relative-ish fallback.
|
||||
sovereignFQDN := getEnv("SOVEREIGN_FQDN", "")
|
||||
// NATS_URL — JetStream broker URL for the catalyst.usage.recorded
|
||||
// metering stream (#798). Empty disables the metering subscriber so
|
||||
// developer environments without NATS can still run the legacy
|
||||
// RedPanda consumer + HTTP API. Per #795 [Q-mine-3] this is the
|
||||
// canonical event spine going forward; per ADR-0001 §6 we never
|
||||
// fall back to RedPanda for new metering subjects.
|
||||
// NATS_URL — JetStream broker URL for BOTH:
|
||||
// (a) the catalyst.usage.recorded metering stream (#798), and
|
||||
// (b) the canonical billing event bus per ADR-0001 §6
|
||||
// (`catalyst.billing.order.placed` etc., wired via
|
||||
// events.MultiPublisher below).
|
||||
// On Sovereigns this is wired to
|
||||
// nats-jetstream.nats-system.svc.cluster.local:4222 by the chart.
|
||||
// Empty disables the NATS leg and the service falls back to the
|
||||
// legacy Redpanda-only Producer (Catalyst-Zero / dev loops).
|
||||
natsURL := getEnv("NATS_URL", "")
|
||||
|
||||
pg := db.MustConnect(databaseURL)
|
||||
@ -57,13 +63,53 @@ func main() {
|
||||
}
|
||||
slog.Info("database migration complete")
|
||||
|
||||
producer, err := events.NewProducer(redpandaBrokers)
|
||||
// Wire the broker publisher up front so both the metering subscriber
|
||||
// (below) and the Handler share a single NATS connection. ADR-0001 §6
|
||||
// requires NATS for the canonical `catalyst.billing.order.placed`
|
||||
// subject; Redpanda stays in place as a legacy bridge so any
|
||||
// not-yet-migrated consumers (e.g. on contabo) keep receiving
|
||||
// sme.order.events. At least one transport MUST be wired.
|
||||
var (
|
||||
natsConn *events.NATSConn
|
||||
kafkaProd *events.Producer
|
||||
)
|
||||
if natsURL != "" {
|
||||
nc, err := events.ConnectNATS(natsURL)
|
||||
if err != nil {
|
||||
slog.Error("failed to connect to NATS", "url", natsURL, "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
// EnsureUsageStream is idempotent — safe to call on every
|
||||
// startup. sme-billing owns the Stream lifecycle because it is
|
||||
// the canonical consumer; publishers (the NewAPI metering
|
||||
// sidecar) rely on the Stream existing.
|
||||
ensureCtx, ensureCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
if err := nc.EnsureUsageStream(ensureCtx); err != nil {
|
||||
ensureCancel()
|
||||
slog.Error("failed to ensure JetStream catalyst.usage stream", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
ensureCancel()
|
||||
natsConn = nc
|
||||
slog.Info("connected to NATS JetStream", "url", natsURL,
|
||||
"stream", events.StreamCatalystUsage,
|
||||
"subject", events.SubjectUsageRecorded)
|
||||
}
|
||||
if redpandaBrokersRaw != "" {
|
||||
kp, err := events.NewProducer(strings.Split(redpandaBrokersRaw, ","))
|
||||
if err != nil {
|
||||
slog.Error("failed to create RedPanda producer", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
kafkaProd = kp
|
||||
slog.Info("connected to RedPanda (legacy)", "brokers", redpandaBrokersRaw)
|
||||
}
|
||||
producer, err := events.NewMultiPublisher(natsConn, kafkaProd)
|
||||
if err != nil {
|
||||
slog.Error("failed to create events producer", "error", err)
|
||||
slog.Error("event bus misconfigured — neither NATS_URL nor REDPANDA_BROKERS set", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer producer.Close()
|
||||
slog.Info("connected to RedPanda")
|
||||
|
||||
h := &handlers.Handler{
|
||||
Store: billingStore,
|
||||
@ -79,57 +125,39 @@ func main() {
|
||||
},
|
||||
}
|
||||
|
||||
// Start the tenant-events consumer so tenant.deleted cascades clean up
|
||||
// Stripe subs, draft/open invoices, and credit-ledger audit rows. See
|
||||
// issue #94. Runs in a background goroutine; broker outages log + retry.
|
||||
tenantConsumer, err := events.NewConsumer(
|
||||
redpandaBrokers,
|
||||
"billing-tenant-events",
|
||||
[]string{"sme.tenant.events"},
|
||||
)
|
||||
if err != nil {
|
||||
slog.Error("failed to create tenant-events consumer", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer tenantConsumer.Close()
|
||||
billingTenantHandler := &handlers.TenantConsumer{Store: billingStore}
|
||||
go func() {
|
||||
if err := billingTenantHandler.Start(context.Background(), tenantConsumer); err != nil {
|
||||
slog.Error("billing tenant-events consumer stopped", "error", err)
|
||||
}
|
||||
}()
|
||||
slog.Info("billing tenant-events consumer started",
|
||||
"topic", "sme.tenant.events", "group", "billing-tenant-events")
|
||||
|
||||
// NATS metering consumer (#798 §B). Per #795 [Q-mine-3] +
|
||||
// ADR-0001 §6, NATS JetStream is the canonical bus for new
|
||||
// subjects; the RedPanda consumer above is legacy and intentionally
|
||||
// left in place for sme.tenant.events. When NATS_URL is unset the
|
||||
// subscriber is skipped — the synchronous HTTP path
|
||||
// (POST /billing/metering/record) still works against the same
|
||||
// store schema, so unit tests + dev loops keep functioning.
|
||||
if natsURL != "" {
|
||||
natsConn, err := events.ConnectNATS(natsURL)
|
||||
// Legacy Kafka tenant-events consumer — only when REDPANDA_BROKERS
|
||||
// is set. On Sovereigns the canonical bus is NATS; the equivalent
|
||||
// NATS-side subscriber ships in a follow-up per ADR-0001 §6
|
||||
// migration plan. Skipping it on NATS-only deployments avoids the
|
||||
// previous crashloop trying to dial "localhost:9092".
|
||||
if kafkaProd != nil {
|
||||
tenantConsumer, err := events.NewConsumer(
|
||||
strings.Split(redpandaBrokersRaw, ","),
|
||||
"billing-tenant-events",
|
||||
[]string{"sme.tenant.events"},
|
||||
)
|
||||
if err != nil {
|
||||
slog.Error("failed to connect to NATS", "url", natsURL, "error", err)
|
||||
slog.Error("failed to create tenant-events consumer", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
// EnsureUsageStream is idempotent — safe to call on every
|
||||
// startup. sme-billing owns the Stream lifecycle because it is
|
||||
// the canonical consumer; publishers (the NewAPI metering
|
||||
// sidecar) rely on the Stream existing.
|
||||
ensureCtx, ensureCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
if err := natsConn.EnsureUsageStream(ensureCtx); err != nil {
|
||||
ensureCancel()
|
||||
slog.Error("failed to ensure JetStream catalyst.usage stream", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
ensureCancel()
|
||||
defer natsConn.Close()
|
||||
slog.Info("connected to NATS JetStream", "url", natsURL,
|
||||
"stream", events.StreamCatalystUsage,
|
||||
"subject", events.SubjectUsageRecorded)
|
||||
defer tenantConsumer.Close()
|
||||
billingTenantHandler := &handlers.TenantConsumer{Store: billingStore}
|
||||
go func() {
|
||||
if err := billingTenantHandler.Start(context.Background(), tenantConsumer); err != nil {
|
||||
slog.Error("billing tenant-events consumer stopped", "error", err)
|
||||
}
|
||||
}()
|
||||
slog.Info("billing tenant-events consumer started",
|
||||
"topic", "sme.tenant.events", "group", "billing-tenant-events")
|
||||
} else {
|
||||
slog.Info("REDPANDA_BROKERS empty — legacy Kafka tenant-events consumer disabled (NATS-only mode)")
|
||||
}
|
||||
|
||||
// NATS metering subscriber (#798 §B) — uses the same NATS connection
|
||||
// opened for the publisher above so we have a single TCP socket per
|
||||
// process. When NATS_URL is unset the subscriber is skipped (HTTP
|
||||
// path POST /billing/metering/record still works for dev loops).
|
||||
if natsConn != nil {
|
||||
subCtx := context.Background()
|
||||
usageSub, err := natsConn.SubscribeUsageRecorded(subCtx)
|
||||
if err != nil {
|
||||
|
||||
164
core/services/shared/events/bridge.go
Normal file
164
core/services/shared/events/bridge.go
Normal file
@ -0,0 +1,164 @@
|
||||
package events
|
||||
|
||||
// Per ADR-0001 §6, the OpenOva canonical event spine is NATS JetStream with
|
||||
// subject convention `catalyst.<domain>.<event>`. Historically the SME
|
||||
// services (tenant, billing, notification, provisioning) emitted to
|
||||
// Redpanda topics under the legacy `sme.<producer>.events` shape because
|
||||
// the bus was Redpanda on contabo. On Sovereigns there IS no Redpanda —
|
||||
// only NATS at nats-jetstream.nats-system.svc.cluster.local:4222 — so
|
||||
// every event the tenant + billing services published was silently lost,
|
||||
// blocking convergence (provisioning never received tenant.created /
|
||||
// order.placed, downstream subscribers never saw tenant.deleted).
|
||||
//
|
||||
// This file introduces a thin BrokerPublisher abstraction so the existing
|
||||
// HTTP handlers can keep calling a single Publish method:
|
||||
//
|
||||
// - On Sovereigns NATS_URL is set and Publish writes to JetStream
|
||||
// subjects (catalyst.<event.Type>) per the canonical convention.
|
||||
// - On legacy contabo the REDPANDA_BROKERS env is non-empty (and may
|
||||
// point at a real Redpanda) — Publish ALSO writes to the legacy
|
||||
// sme.<producer>.events topic so any not-yet-migrated consumers
|
||||
// keep receiving events.
|
||||
// - Both can run together. NATS wins for new subscribers; Redpanda
|
||||
// keeps legacy consumers alive during the migration window.
|
||||
//
|
||||
// Subject derivation: each call site already knows the legacy Kafka topic
|
||||
// it wants to write to AND the event.Type (e.g. "tenant.created"). The
|
||||
// canonical NATS subject is `catalyst.<event.Type>`. The bridge handles
|
||||
// both publishes inside one method so call sites don't grow conditional
|
||||
// branches everywhere.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// BrokerPublisher is the unified publish surface every SME service uses
|
||||
// for outbound domain events. Implementations:
|
||||
//
|
||||
// - MultiPublisher (this file): fans out to NATS + optional Redpanda.
|
||||
// - Producer (events.go): legacy Kafka-only path retained for tests
|
||||
// and the contabo-only fallback.
|
||||
//
|
||||
// The interface intentionally matches Producer.Publish so call sites can
|
||||
// migrate by changing the field type without touching every Publish call.
|
||||
type BrokerPublisher interface {
|
||||
// Publish writes the event to BOTH the canonical NATS subject
|
||||
// (derived from event.Type as catalyst.<event.Type>) AND, if the
|
||||
// implementation was configured with Kafka brokers, the legacy
|
||||
// kafkaTopic. Errors are best-effort — NATS failures bubble up so
|
||||
// the caller can log/observe, Kafka failures are logged but do not
|
||||
// fail the call (the legacy bus is non-authoritative on Sovereigns).
|
||||
Publish(ctx context.Context, kafkaTopic string, event *Event) error
|
||||
// Close releases the underlying connections.
|
||||
Close()
|
||||
}
|
||||
|
||||
// MultiPublisher fans an event out to NATS (canonical) + Redpanda (legacy).
|
||||
// Either side may be nil:
|
||||
//
|
||||
// - nats == nil: pure-Kafka publisher (matches legacy contabo behaviour
|
||||
// when NATS_URL is empty).
|
||||
// - kafka == nil: pure-NATS publisher (matches the canonical Sovereign
|
||||
// wiring where REDPANDA_BROKERS is intentionally empty).
|
||||
//
|
||||
// At construction time NewMultiPublisher refuses to return a publisher
|
||||
// with both sides nil — that would silently drop every event and is the
|
||||
// exact failure mode that blocked convergence in the first place.
|
||||
type MultiPublisher struct {
|
||||
nats *NATSConn
|
||||
kafka *Producer
|
||||
}
|
||||
|
||||
// NewMultiPublisher constructs a publisher wired to the supplied NATS
|
||||
// connection and/or Kafka producer. Pass nil for either side to disable
|
||||
// that transport.
|
||||
//
|
||||
// Returns an error if BOTH transports are nil — a publisher with no
|
||||
// outputs is always a configuration bug (the events would be silently
|
||||
// dropped, which is what got us into this mess).
|
||||
func NewMultiPublisher(nc *NATSConn, kp *Producer) (*MultiPublisher, error) {
|
||||
if nc == nil && kp == nil {
|
||||
return nil, errors.New("events: NewMultiPublisher requires at least one of NATS or Kafka")
|
||||
}
|
||||
return &MultiPublisher{nats: nc, kafka: kp}, nil
|
||||
}
|
||||
|
||||
// CanonicalSubject returns the canonical NATS JetStream subject for an
|
||||
// event.Type per ADR-0001 §6 (`catalyst.<event.Type>`). Exported so
|
||||
// tests + alternate publishers can derive the same name without
|
||||
// duplicating the convention.
|
||||
//
|
||||
// Examples:
|
||||
//
|
||||
// "tenant.created" → "catalyst.tenant.created"
|
||||
// "order.placed" → "catalyst.billing.order.placed"
|
||||
// "tenant.app_install_requested" → "catalyst.tenant.app_install_requested"
|
||||
//
|
||||
// The `order.placed` → `catalyst.billing.order.placed` mapping mirrors
|
||||
// the rule the user spelled out: billing's order events live under the
|
||||
// `billing` domain even though the event.Type itself does not contain
|
||||
// "billing" (the producer-side domain is implied by the service name).
|
||||
func CanonicalSubject(eventType string) string {
|
||||
// Empty or already-canonical event types are passed through as-is.
|
||||
if eventType == "" {
|
||||
return ""
|
||||
}
|
||||
if strings.HasPrefix(eventType, "catalyst.") {
|
||||
return eventType
|
||||
}
|
||||
// Billing's order events sit under the billing domain by convention.
|
||||
// Without this special-case `order.placed` would become
|
||||
// `catalyst.order.placed` which mixes billing concerns with the
|
||||
// `order` domain that other services may later own (refunds,
|
||||
// disputes). Naming bug we deliberately don't ship.
|
||||
if strings.HasPrefix(eventType, "order.") {
|
||||
return "catalyst.billing." + eventType
|
||||
}
|
||||
return "catalyst." + eventType
|
||||
}
|
||||
|
||||
// Publish writes the event to NATS (canonical) and Kafka (legacy) if
|
||||
// either transport is configured. NATS publish errors are returned;
|
||||
// Kafka publish errors are logged but swallowed so a stale legacy
|
||||
// broker cannot break a healthy Sovereign.
|
||||
func (m *MultiPublisher) Publish(ctx context.Context, kafkaTopic string, event *Event) error {
|
||||
if event == nil {
|
||||
return errors.New("events: Publish nil event")
|
||||
}
|
||||
subject := CanonicalSubject(event.Type)
|
||||
|
||||
var natsErr error
|
||||
if m.nats != nil && subject != "" {
|
||||
natsErr = m.nats.PublishEvent(ctx, subject, event)
|
||||
if natsErr != nil {
|
||||
// Surface so the HTTP handler's slog.Error captures it.
|
||||
natsErr = fmt.Errorf("nats publish %s: %w", subject, natsErr)
|
||||
}
|
||||
}
|
||||
|
||||
if m.kafka != nil && kafkaTopic != "" {
|
||||
if err := m.kafka.Publish(ctx, kafkaTopic, event); err != nil {
|
||||
// Legacy bus failures should NEVER fail a Sovereign-side
|
||||
// call — log + continue. Returning would block requests on
|
||||
// a Redpanda outage that is irrelevant to Sovereign flows.
|
||||
slog.Warn("events: legacy kafka publish failed",
|
||||
"topic", kafkaTopic, "type", event.Type, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
return natsErr
|
||||
}
|
||||
|
||||
// Close releases NATS + Kafka connections (whichever are non-nil).
|
||||
func (m *MultiPublisher) Close() {
|
||||
if m.nats != nil {
|
||||
m.nats.Close()
|
||||
}
|
||||
if m.kafka != nil {
|
||||
m.kafka.Close()
|
||||
}
|
||||
}
|
||||
45
core/services/shared/events/bridge_test.go
Normal file
45
core/services/shared/events/bridge_test.go
Normal file
@ -0,0 +1,45 @@
|
||||
package events
|
||||
|
||||
import "testing"
|
||||
|
||||
// TestCanonicalSubject pins ADR-0001 §6 (`catalyst.<domain>.<event>`)
|
||||
// — every existing call site relies on the mapping below so the
|
||||
// fan-out publisher writes to the correct JetStream subject.
|
||||
func TestCanonicalSubject(t *testing.T) {
|
||||
cases := []struct {
|
||||
in, want string
|
||||
}{
|
||||
// Tenant lifecycle events the tenant service emits.
|
||||
{"tenant.created", "catalyst.tenant.created"},
|
||||
{"tenant.deleted", "catalyst.tenant.deleted"},
|
||||
{"tenant.app_install_requested", "catalyst.tenant.app_install_requested"},
|
||||
{"tenant.app_uninstall_requested", "catalyst.tenant.app_uninstall_requested"},
|
||||
// Billing event — the only `order.*` producer today is billing,
|
||||
// so the canonical subject puts it under the billing domain
|
||||
// (see CanonicalSubject for the rationale).
|
||||
{"order.placed", "catalyst.billing.order.placed"},
|
||||
// Already-canonical subjects pass through.
|
||||
{"catalyst.usage.recorded", "catalyst.usage.recorded"},
|
||||
// Empty in → empty out (NATS leg skipped).
|
||||
{"", ""},
|
||||
// Generic platform event.
|
||||
{"domain.verified", "catalyst.domain.verified"},
|
||||
}
|
||||
for _, c := range cases {
|
||||
got := CanonicalSubject(c.in)
|
||||
if got != c.want {
|
||||
t.Errorf("CanonicalSubject(%q) = %q, want %q", c.in, got, c.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestNewMultiPublisherRequiresAtLeastOne pins the safety check —
|
||||
// silently constructing a no-op publisher is the exact failure mode
|
||||
// that blocked Sovereign convergence (events were dropped on the floor
|
||||
// when REDPANDA_BROKERS pointed at a non-existent broker and NATS_URL
|
||||
// was unset).
|
||||
func TestNewMultiPublisherRequiresAtLeastOne(t *testing.T) {
|
||||
if _, err := NewMultiPublisher(nil, nil); err == nil {
|
||||
t.Fatal("NewMultiPublisher(nil, nil) should return an error")
|
||||
}
|
||||
}
|
||||
@ -204,6 +204,47 @@ func (c *NATSConn) PublishUsage(ctx context.Context, payload UsageRecordedPayloa
|
||||
return nil
|
||||
}
|
||||
|
||||
// PublishEvent publishes a generic domain Event envelope (events.go) on
|
||||
// the supplied JetStream subject. Used by MultiPublisher (bridge.go) to
|
||||
// route the same tenant.created / order.placed / tenant.deleted events
|
||||
// that legacy code emitted to Redpanda onto the canonical
|
||||
// `catalyst.<event.Type>` NATS subjects per ADR-0001 §6.
|
||||
//
|
||||
// Event.ID is set as the JetStream Msg-Id so duplicate publishes inside
|
||||
// the broker's de-dup window collapse at the broker — gives the
|
||||
// at-least-once HTTP retry path an idempotency guarantee end-to-end.
|
||||
//
|
||||
// This deliberately does NOT require any per-subject Stream to exist:
|
||||
// JetStream brokers configured with the default `catalyst.>` Stream (or
|
||||
// a per-domain Stream listening on `catalyst.tenant.>` /
|
||||
// `catalyst.billing.>`) will pick the message up automatically. If no
|
||||
// matching Stream exists the broker returns "no responders" which
|
||||
// surfaces as an error here — surfacing the misconfiguration is correct,
|
||||
// silently dropping events is what got us into this mess.
|
||||
func (c *NATSConn) PublishEvent(ctx context.Context, subject string, event *Event) error {
|
||||
if c == nil || c.js == nil {
|
||||
return errors.New("events: nats publisher not initialised")
|
||||
}
|
||||
if subject == "" {
|
||||
return errors.New("events: PublishEvent requires a subject")
|
||||
}
|
||||
if event == nil {
|
||||
return errors.New("events: PublishEvent nil event")
|
||||
}
|
||||
body, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("events: marshal event: %w", err)
|
||||
}
|
||||
opts := []jetstream.PublishOpt{}
|
||||
if event.ID != "" {
|
||||
opts = append(opts, jetstream.WithMsgID(event.ID))
|
||||
}
|
||||
if _, err := c.js.Publish(ctx, subject, body, opts...); err != nil {
|
||||
return fmt.Errorf("events: publish %s: %w", subject, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close shuts down the underlying NATS connection.
|
||||
func (c *NATSConn) Close() {
|
||||
if c.nc != nil {
|
||||
|
||||
@ -33,8 +33,15 @@ func validTenantSlug(s string) bool {
|
||||
|
||||
// Handler holds dependencies for tenant HTTP handlers.
|
||||
type Handler struct {
|
||||
Store *store.Store
|
||||
Producer *events.Producer
|
||||
Store *store.Store
|
||||
// Producer is the broker publisher used to emit tenant lifecycle
|
||||
// events (tenant.created, tenant.deleted, tenant.app_install_requested,
|
||||
// tenant.app_uninstall_requested). Type is the BrokerPublisher
|
||||
// interface so main.go can wire a MultiPublisher (NATS + Redpanda)
|
||||
// per ADR-0001 §6 — see core/services/shared/events/bridge.go for
|
||||
// why the legacy Redpanda-only Producer was insufficient on
|
||||
// Sovereigns (no Redpanda exists there).
|
||||
Producer events.BrokerPublisher
|
||||
// Catalog is optional; when unset the day-2 app install/uninstall
|
||||
// endpoints return 501. Provisioning-time creation does not need it
|
||||
// because the marketplace already validated capacity at checkout.
|
||||
|
||||
@ -23,7 +23,16 @@ func main() {
|
||||
// Configuration from environment.
|
||||
mongoURI := getEnv("MONGODB_URI", "mongodb://ferretdb:27017")
|
||||
mongoDBName := getEnv("MONGODB_DB", "tenants")
|
||||
redpandaBrokers := strings.Split(getEnv("REDPANDA_BROKERS", "localhost:9092"), ",")
|
||||
// NATS_URL — canonical event bus per ADR-0001 §6. On Sovereigns this
|
||||
// is wired to nats-jetstream.nats-system.svc.cluster.local:4222 by
|
||||
// the chart. Empty disables the NATS leg and the service falls back
|
||||
// to the legacy Redpanda-only Producer (Catalyst-Zero / dev loops).
|
||||
natsURL := getEnv("NATS_URL", "")
|
||||
// REDPANDA_BROKERS — legacy Kafka-protocol bus retained for
|
||||
// backward-compatibility with not-yet-migrated consumers. Empty
|
||||
// disables the Redpanda leg entirely. On Sovereigns this is left
|
||||
// empty (no Redpanda exists in cluster).
|
||||
redpandaBrokersRaw := getEnv("REDPANDA_BROKERS", "")
|
||||
jwtSecret := []byte(getEnv("JWT_SECRET", ""))
|
||||
corsOrigin := getEnv("CORS_ORIGIN", "*")
|
||||
catalogURL := getEnv("CATALOG_URL", "http://catalog.sme.svc.cluster.local:8082")
|
||||
@ -50,14 +59,39 @@ func main() {
|
||||
}()
|
||||
slog.Info("connected to FerretDB", "uri", mongoURI, "db", mongoDBName)
|
||||
|
||||
// Create events producer.
|
||||
producer, err := events.NewProducer(redpandaBrokers)
|
||||
// Wire the broker publisher. NATS is the canonical bus on Sovereigns
|
||||
// (publish to `catalyst.tenant.<event>` per ADR-0001 §6); Redpanda
|
||||
// is the legacy bus retained for backward-compatibility on
|
||||
// Catalyst-Zero. At least one MUST be configured or NewMultiPublisher
|
||||
// refuses to construct a silently-no-op publisher.
|
||||
var (
|
||||
natsConn *events.NATSConn
|
||||
kafkaProd *events.Producer
|
||||
)
|
||||
if natsURL != "" {
|
||||
nc, err := events.ConnectNATS(natsURL)
|
||||
if err != nil {
|
||||
slog.Error("failed to connect to NATS", "url", natsURL, "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
natsConn = nc
|
||||
slog.Info("connected to NATS JetStream", "url", natsURL)
|
||||
}
|
||||
if redpandaBrokersRaw != "" {
|
||||
kp, err := events.NewProducer(strings.Split(redpandaBrokersRaw, ","))
|
||||
if err != nil {
|
||||
slog.Error("failed to create RedPanda producer", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
kafkaProd = kp
|
||||
slog.Info("connected to RedPanda (legacy)", "brokers", redpandaBrokersRaw)
|
||||
}
|
||||
producer, err := events.NewMultiPublisher(natsConn, kafkaProd)
|
||||
if err != nil {
|
||||
slog.Error("failed to create events producer", "error", err)
|
||||
slog.Error("event bus misconfigured — neither NATS_URL nor REDPANDA_BROKERS set", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer producer.Close()
|
||||
slog.Info("connected to RedPanda")
|
||||
|
||||
// Initialize store and handler.
|
||||
tenantStore := store.New(client, mongoDBName)
|
||||
@ -72,42 +106,53 @@ func main() {
|
||||
slog.Info("catalog client configured", "url", catalogURL)
|
||||
slog.Info("provisioning URL configured", "url", provisioningURL)
|
||||
|
||||
// Subscribe to provision events so tenant status reflects provisioning outcome.
|
||||
provConsumer, err := events.NewConsumer(redpandaBrokers, "tenant-service", []string{"sme.provision.events"})
|
||||
if err != nil {
|
||||
slog.Error("failed to create provision consumer", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer provConsumer.Close()
|
||||
consumerHandler := &handlers.ConsumerHandler{Store: tenantStore}
|
||||
go func() {
|
||||
if err := consumerHandler.Start(context.Background(), provConsumer); err != nil {
|
||||
slog.Error("provision consumer stopped", "error", err)
|
||||
// Legacy Kafka consumers (sme.provision.events + sme.tenant.events) are
|
||||
// only started when REDPANDA_BROKERS is set. On Sovereigns the canonical
|
||||
// path is NATS JetStream; the equivalent NATS-side subscribers are
|
||||
// scheduled to ship in a follow-up (see ADR-0001 §6 migration plan).
|
||||
// Keeping these legacy goroutines off when no Kafka broker is wired
|
||||
// avoids the previous behaviour of crashlooping the pod trying to
|
||||
// dial "localhost:9092".
|
||||
if redpandaBrokersRaw != "" {
|
||||
redpandaBrokers := strings.Split(redpandaBrokersRaw, ",")
|
||||
provConsumer, err := events.NewConsumer(redpandaBrokers, "tenant-service", []string{"sme.provision.events"})
|
||||
if err != nil {
|
||||
slog.Error("failed to create provision consumer", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}()
|
||||
defer provConsumer.Close()
|
||||
consumerHandler := &handlers.ConsumerHandler{Store: tenantStore}
|
||||
go func() {
|
||||
if err := consumerHandler.Start(context.Background(), provConsumer); err != nil {
|
||||
slog.Error("provision consumer stopped", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Members-cleanup consumer — purges member rows as soon as a tenant is
|
||||
// soft-deleted so authz checks during the teardown window don't see
|
||||
// stale membership. Separate consumer group so offsets don't contend
|
||||
// with the provision-events subscriber above. See issue #96.
|
||||
membersConsumer, err := events.NewConsumer(
|
||||
redpandaBrokers,
|
||||
"tenant-members-cleanup",
|
||||
[]string{"sme.tenant.events"},
|
||||
)
|
||||
if err != nil {
|
||||
slog.Error("failed to create members-cleanup consumer", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer membersConsumer.Close()
|
||||
membersCleanup := &handlers.MembersCleanupConsumer{Store: tenantStore}
|
||||
go func() {
|
||||
if err := membersCleanup.Start(context.Background(), membersConsumer); err != nil {
|
||||
slog.Error("tenant members-cleanup consumer stopped", "error", err)
|
||||
// Members-cleanup consumer — purges member rows as soon as a tenant is
|
||||
// soft-deleted so authz checks during the teardown window don't see
|
||||
// stale membership. Separate consumer group so offsets don't contend
|
||||
// with the provision-events subscriber above. See issue #96.
|
||||
membersConsumer, err := events.NewConsumer(
|
||||
redpandaBrokers,
|
||||
"tenant-members-cleanup",
|
||||
[]string{"sme.tenant.events"},
|
||||
)
|
||||
if err != nil {
|
||||
slog.Error("failed to create members-cleanup consumer", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}()
|
||||
slog.Info("tenant members-cleanup consumer started",
|
||||
"topic", "sme.tenant.events", "group", "tenant-members-cleanup")
|
||||
defer membersConsumer.Close()
|
||||
membersCleanup := &handlers.MembersCleanupConsumer{Store: tenantStore}
|
||||
go func() {
|
||||
if err := membersCleanup.Start(context.Background(), membersConsumer); err != nil {
|
||||
slog.Error("tenant members-cleanup consumer stopped", "error", err)
|
||||
}
|
||||
}()
|
||||
slog.Info("tenant members-cleanup consumer started",
|
||||
"topic", "sme.tenant.events", "group", "tenant-members-cleanup")
|
||||
} else {
|
||||
slog.Info("REDPANDA_BROKERS empty — legacy Kafka consumers disabled (NATS-only mode)")
|
||||
}
|
||||
|
||||
// Build the main mux.
|
||||
mux := http.NewServeMux()
|
||||
|
||||
@ -52,6 +52,18 @@ spec:
|
||||
configMapKeyRef:
|
||||
name: sme-services-config
|
||||
key: REDPANDA_BROKERS
|
||||
# NATS JetStream URL — canonical event bus per ADR-0001 §6.
|
||||
# Used for BOTH (a) the existing catalyst.usage.recorded
|
||||
# metering subscriber (#798) and (b) the new canonical
|
||||
# `catalyst.billing.order.placed` publish path (see
|
||||
# events.MultiPublisher). On Sovereigns the ConfigMap
|
||||
# default resolves to
|
||||
# nats://nats-jetstream.nats-system.svc.cluster.local:4222.
|
||||
- name: NATS_URL
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
name: sme-services-config
|
||||
key: NATS_URL
|
||||
- name: JWT_SECRET
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
|
||||
@ -31,11 +31,41 @@ the env to EVENT_BUS_BROKERS once every SME service binary reads the new
|
||||
name; that rename can ship without re-touching this template because the
|
||||
key name is the only thing this template promises to its consumers.
|
||||
*/ -}}
|
||||
{{- /*
|
||||
Convergence fix 2026-05-18 — Sovereign event bus wiring:
|
||||
|
||||
(1) Sovereign default of REDPANDA_BROKERS is now EMPTY (not the NATS URL
|
||||
poked into a Kafka env). The SME services' Kafka client cannot speak
|
||||
the NATS wire protocol; the previous default value of
|
||||
`nats-jetstream.nats-jetstream.svc.cluster.local:4222` made every
|
||||
franz-go dial fail at startup, which on the publish side silently
|
||||
dropped tenant.created / order.placed and blocked convergence end
|
||||
to end.
|
||||
(2) NATS_URL is now exported as a separate ConfigMap key and wired into
|
||||
the tenant + billing Deployments (see tenant.yaml / billing.yaml).
|
||||
Services use the JetStream-native MultiPublisher (see
|
||||
core/services/shared/events/bridge.go) to write to
|
||||
`catalyst.<domain>.<event>` per ADR-0001 §6.
|
||||
(3) Note also the NATS namespace: `bp-nats-jetstream` reconciles into
|
||||
namespace `nats-system` (see clusters/_template/bootstrap-kit/
|
||||
07-nats-jetstream.yaml), NOT `nats-jetstream`. Earlier defaults that
|
||||
pointed at `.nats-jetstream.svc` resolved to NXDOMAIN.
|
||||
|
||||
Catalyst-Zero (contabo, global.sovereignFQDN empty) keeps the legacy
|
||||
Redpanda default so contabo's existing SME consumers keep functioning.
|
||||
*/ -}}
|
||||
{{- $defaultBrokers := "redpanda.talentmesh.svc.cluster.local:9092" -}}
|
||||
{{- $defaultNATS := "" -}}
|
||||
{{- if .Values.global.sovereignFQDN -}}
|
||||
{{- $defaultBrokers = "nats-jetstream.nats-jetstream.svc.cluster.local:4222" -}}
|
||||
{{- /* Sovereign default: no Redpanda exists in cluster — leave empty
|
||||
so the SME services skip the legacy Kafka transport entirely. */ -}}
|
||||
{{- $defaultBrokers = "" -}}
|
||||
{{- /* Sovereign default NATS URL. Correct namespace is `nats-system`
|
||||
(see header comment §3). */ -}}
|
||||
{{- $defaultNATS = "nats://nats-jetstream.nats-system.svc.cluster.local:4222" -}}
|
||||
{{- end -}}
|
||||
{{- $brokers := default $defaultBrokers (index (default (dict) .Values.smeServices.eventBus) "brokers") -}}
|
||||
{{- $natsURL := default $defaultNATS (index (default (dict) .Values.smeServices.eventBus) "natsURL") -}}
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
@ -81,8 +111,17 @@ data:
|
||||
POSTGRES_PORT: "{{ .Values.smeServices.ferretdb.postgresPort | default 5432 }}"
|
||||
|
||||
# ---- event bus ------------------------------------------------------------
|
||||
# See header comment for the topology-aware default selection (issue #942).
|
||||
# See header comment for the topology-aware default selection (issue #942
|
||||
# + 2026-05-18 convergence fix). On Sovereigns REDPANDA_BROKERS is
|
||||
# intentionally empty — the SME services treat empty as "skip the
|
||||
# legacy Kafka transport" (NATS-only mode).
|
||||
REDPANDA_BROKERS: {{ $brokers | quote }}
|
||||
# NATS JetStream URL — canonical event bus per ADR-0001 §6. Wired
|
||||
# into tenant + billing Deployments (and any future SME service
|
||||
# publishing `catalyst.<domain>.<event>` subjects via
|
||||
# events.MultiPublisher). Empty on Catalyst-Zero (no NATS yet);
|
||||
# populated on Sovereigns by the default above.
|
||||
NATS_URL: {{ $natsURL | quote }}
|
||||
# Protocol hint for SME services that want to switch wire formats per
|
||||
# broker without re-reading the URL — `kafka` for Redpanda (legacy
|
||||
# contabo), `nats` for NATS JetStream (Sovereign default). Operator-
|
||||
|
||||
@ -37,6 +37,16 @@ spec:
|
||||
configMapKeyRef:
|
||||
name: sme-services-config
|
||||
key: REDPANDA_BROKERS
|
||||
# NATS JetStream URL — canonical event bus per ADR-0001 §6.
|
||||
# On Sovereigns the ConfigMap default resolves to
|
||||
# nats://nats-jetstream.nats-system.svc.cluster.local:4222
|
||||
# (see configmap.yaml header). Service publishes
|
||||
# `catalyst.tenant.<event>` subjects via events.MultiPublisher.
|
||||
- name: NATS_URL
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
name: sme-services-config
|
||||
key: NATS_URL
|
||||
- name: JWT_SECRET
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
|
||||
@ -600,8 +600,10 @@ services:
|
||||
# k8scache.Factory).
|
||||
clusterID: ""
|
||||
nats:
|
||||
# In-cluster NATS JetStream Service URL.
|
||||
url: "nats://nats-jetstream.nats-jetstream.svc.cluster.local:4222"
|
||||
# In-cluster NATS JetStream Service URL. Namespace is `nats-system`
|
||||
# (see clusters/_template/bootstrap-kit/07-nats-jetstream.yaml) —
|
||||
# previous default of `.nats-jetstream.svc` was an NXDOMAIN bug.
|
||||
url: "nats://nats-jetstream.nats-system.svc.cluster.local:4222"
|
||||
stream: "catalyst.events"
|
||||
subject: "catalyst.events.>"
|
||||
valkey:
|
||||
@ -921,8 +923,11 @@ smeServices:
|
||||
# services still target a Redpanda Service in the talentmesh namespace
|
||||
# (migration #68). The configmap.yaml template selects the default at
|
||||
# render time based on .Values.global.sovereignFQDN:
|
||||
# - non-empty (Sovereign) → nats-jetstream.nats-jetstream.svc:4222
|
||||
# - empty (Catalyst-Zero) → redpanda.talentmesh.svc:9092
|
||||
# - non-empty (Sovereign) → REDPANDA_BROKERS="" + NATS_URL=
|
||||
# nats://nats-jetstream.nats-system.svc.cluster.local:4222
|
||||
# (services run in NATS-only mode per ADR-0001 §6)
|
||||
# - empty (Catalyst-Zero) → REDPANDA_BROKERS=redpanda.talentmesh.svc:9092
|
||||
# (legacy Kafka transport remains the bus on contabo)
|
||||
# `brokers` overrides the default outright — operator MAY wire any
|
||||
# NATS-protocol or Kafka-protocol broker without forking the chart.
|
||||
# `protocol` is an explicit hint for SME services that want to switch
|
||||
@ -931,6 +936,13 @@ smeServices:
|
||||
eventBus:
|
||||
brokers: ""
|
||||
protocol: ""
|
||||
# Canonical NATS JetStream URL per ADR-0001 §6. Empty = use the
|
||||
# topology default from configmap.yaml (Sovereign:
|
||||
# nats://nats-jetstream.nats-system.svc.cluster.local:4222 ;
|
||||
# Catalyst-Zero: empty). Per-Sovereign overlays MAY override to
|
||||
# wire a different NATS endpoint (e.g. a leaf-node connection) or
|
||||
# set explicitly empty to force NATS-disabled mode.
|
||||
natsURL: ""
|
||||
ferretdb:
|
||||
namespace: sme
|
||||
# FerretDB v1.24 — works against vanilla CNPG postgres:16. v2.x
|
||||
|
||||
Loading…
Reference in New Issue
Block a user