fix(cloud): /cloud?view=list&kind=nodes fans out across all registered clusters (Refs TBD-E6, C3-010) (#1705)

Symptom (t22, chart 1.4.166): /cloud?view=list&kind=nodes on a
3-region Sovereign showed only 1 row (the primary cluster's CP node)
while the aggregate /cloud chips correctly counted Region 3/3,
Cluster 3/3, NodePool 3/3.

Root cause: HandleK8sList (and HandleK8sStream) used
resolveChrootClusterID to pick exactly one registered cluster id
(the "sovereign-<fqdn>" primary). Once the chroot's k8sCache.Factory
self-registered secondaries from PR #1579 + #1581, those clusters
were known to the cache but never queried by these handlers — the
secondary-cluster nodes, LBs, etc. were invisible to the operator.

This is the same D16 fan-out gap the dashboard hit on t136 (memory
session_2026_05_17_t142_6_of_6_GREEN.md): the original
"resolveChrootClusterID len(clusters)!=1" assumption did not survive
the multi-region kubeconfig fan-out.

Fix: enumerate every registered cluster id via k8sCache.Clusters()
when serving GET /api/v1/sovereigns/{id}/k8s/{kind}, merge the
per-cluster List() results, and stamp each row with its source
cluster under top-level `cluster`. Same pattern in HandleK8sStream:
accept SSE events from any allowed cluster, snapshot initial state
from every cluster. SAR-gate now keys on (user, cluster, kind, ns)
so secondary-cluster authorization is correctly enforced.

Wire-shape backward compatibility: single-cluster Sovereigns keep
the legacy shape byte-identical (Cluster=<id>, no Clusters[], no
per-row cluster stamp). New Clusters[] response field appears only
when ≥2 clusters contributed rows.

Refs WBS TBD-E6 (C3-010).

Co-authored-by: hatiyildiz <hati.yildiz@openova.io>
This commit is contained in:
e3mrah 2026-05-18 17:31:39 +04:00 committed by GitHub
parent f686e30823
commit 7adc8f1d75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 352 additions and 32 deletions

View File

@ -65,6 +65,14 @@ type K8sListResponse struct {
// carries the same number for clients that prefer headers over
// body fields.
AgeSeconds float64 `json:"ageSeconds"`
// Clusters — every cluster id that contributed rows to this
// response. Populated when the Sovereign's k8scache.Factory has
// >1 cluster registered (multi-region fan-out). Each item's
// top-level `cluster` field carries the id of the cluster that
// produced it; this header field lets consumers know up-front
// which set of ids to expect. Empty/absent on single-cluster
// Sovereigns for backward-compat. See TBD-E6 (C3-010).
Clusters []string `json:"clusters,omitempty"`
}
// HandleK8sList — GET /api/v1/sovereigns/{id}/k8s/{kind}
@ -137,56 +145,143 @@ func (h *Handler) HandleK8sList(w http.ResponseWriter, r *http.Request) {
sel = parsed
}
items, age, err := h.k8sCache.List(clusterID, kindName, sel)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
// Multi-region fan-out (TBD-E6 / C3-010, 2026-05-18): when the
// Sovereign's k8sCache has more than one cluster registered
// (primary + N secondaries via the secondary-kubeconfig handover
// hook, PRs #1579 + #1581), enumerate items from EVERY registered
// cluster and merge — stamping each row with its source cluster
// id. Without this, /cloud?view=list&kind=nodes on a 3-region
// Sovereign showed only the primary cluster's 1 node despite the
// aggregate /dashboard chips correctly reporting 3/3 (caught on
// t22 chart 1.4.166). This mirrors the dashboard fan-out shipped
// in PR #1583 for the same root cause.
//
// The single-cluster path falls back to the resolved primary id
// only — preserves the original semantics for legacy callers and
// keeps wire-shape backward-compatible (Cluster=primary,
// Clusters=[] omitted).
fanOutIDs := []string{clusterID}
if h.k8sCache != nil {
seen := map[string]struct{}{clusterID: {}}
for _, cid := range h.k8sCache.Clusters() {
if _, ok := seen[cid]; ok {
continue
}
seen[cid] = struct{}{}
fanOutIDs = append(fanOutIDs, cid)
}
}
// Carry the source cluster id alongside each item positionally
// — the parallel slice survives sort + paginate + SAR-gate so
// the final flatten step can stamp each row with its true
// origin. Using a parallel slice (vs annotating the cached
// Unstructured pointer) avoids mutating the Indexer's shared
// cache; multiple concurrent readers would race otherwise.
var items []*unstructured.Unstructured
var itemClusters []string
var age time.Duration
contributingClusters := make([]string, 0, len(fanOutIDs))
for _, cid := range fanOutIDs {
cItems, cAge, err := h.k8sCache.List(cid, kindName, sel)
if err != nil {
// Primary failure is fatal (matches pre-fan-out behaviour);
// secondary failures degrade silently — better to render N-1
// regions than to 404 the whole page.
if cid == clusterID {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
h.log.Warn("k8s list fan-out skipped cluster",
"cluster", cid, "kind", kindName, "err", err)
continue
}
contributingClusters = append(contributingClusters, cid)
items = append(items, cItems...)
for range cItems {
itemClusters = append(itemClusters, cid)
}
// Surface the worst (oldest) staleness across the fan-out —
// the operator must see "any region stale" rather than the
// freshest cache hiding lag elsewhere.
if cAge > age {
age = cAge
}
}
// Field-selector — restricted to metadata.{name,namespace}. The
// catalyst-api's Indexer doesn't index spec/status, so exposing
// the full apiserver field-selector grammar would be misleading.
if fs := q.Get("fieldSelector"); fs != "" {
items = applyFieldSelector(items, fs)
items, itemClusters = applyFieldSelectorWithClusters(items, itemClusters, fs)
}
// Namespace pre-filter (cheap; before SAR loop).
if ns != "" {
filtered := items[:0]
for _, it := range items {
fItems := items[:0]
fClusters := itemClusters[:0]
for i, it := range items {
if it.GetNamespace() == ns {
filtered = append(filtered, it)
fItems = append(fItems, it)
fClusters = append(fClusters, itemClusters[i])
}
}
items = filtered
items = fItems
itemClusters = fClusters
}
// SAR gate per (user, kind, namespace).
// SAR gate per (user, kind, cluster, namespace). The fan-out makes
// the cluster id part of the cache key so a user with `get` only
// on the primary doesn't accidentally see secondary-cluster rows.
user := h.k8sUser(r)
if user != "" && h.sarCache != nil {
gated := items[:0]
seenNS := map[string]bool{}
allowedNS := map[string]bool{}
for _, it := range items {
gItems := items[:0]
gClusters := itemClusters[:0]
seen := map[string]bool{}
allowed := map[string]bool{}
for i, it := range items {
cid := itemClusters[i]
n := it.GetNamespace()
if !seenNS[n] {
seenNS[n] = true
allowedNS[n] = h.sarCache.Allowed(r.Context(), h.k8sCache, user, clusterID, kindName, n, "get")
key := cid + "/" + n
if !seen[key] {
seen[key] = true
allowed[key] = h.sarCache.Allowed(r.Context(), h.k8sCache, user, cid, kindName, n, "get")
}
if allowedNS[n] {
gated = append(gated, it)
if allowed[key] {
gItems = append(gItems, it)
gClusters = append(gClusters, cid)
}
}
items = gated
items = gItems
itemClusters = gClusters
}
// Stable order by (namespace, name) — important so pagination
// cursors are repeatable.
sort.SliceStable(items, func(i, j int) bool {
ai := items[i].GetNamespace() + "/" + items[i].GetName()
bj := items[j].GetNamespace() + "/" + items[j].GetName()
// Stable order by (cluster, namespace, name) — pagination cursor
// is repeatable across the merged set. Cluster goes first so the
// UI groups rows by region; within a region the kubectl-style
// (ns, name) order is preserved.
sortIdx := make([]int, len(items))
for i := range sortIdx {
sortIdx[i] = i
}
sort.SliceStable(sortIdx, func(i, j int) bool {
a := sortIdx[i]
b := sortIdx[j]
if itemClusters[a] != itemClusters[b] {
return itemClusters[a] < itemClusters[b]
}
ai := items[a].GetNamespace() + "/" + items[a].GetName()
bj := items[b].GetNamespace() + "/" + items[b].GetName()
return ai < bj
})
sortedItems := make([]*unstructured.Unstructured, len(items))
sortedClusters := make([]string, len(items))
for i, j := range sortIdx {
sortedItems[i] = items[j]
sortedClusters[i] = itemClusters[j]
}
items = sortedItems
itemClusters = sortedClusters
// Pagination.
total := len(items)
@ -198,6 +293,7 @@ func (h *Handler) HandleK8sList(w http.ResponseWriter, r *http.Request) {
endIdx = total
}
page := items[startIdx:endIdx]
pageClusters := itemClusters[startIdx:endIdx]
cont := ""
if endIdx < total {
cont = base64.RawURLEncoding.EncodeToString([]byte(strconv.Itoa(endIdx)))
@ -219,13 +315,26 @@ func (h *Handler) HandleK8sList(w http.ResponseWriter, r *http.Request) {
// `feedback_no_mvp_no_workarounds.md` every hoisted value is REAL
// data — it's the same byte the embedded Object carries, surfaced
// at the top level under a stable key.
flatPage := flattenK8sListItems(kindName, page)
// Clusters header + per-row stamp — only surface when we actually
// fanned out (>1 cluster contributed); keeps single-cluster wire
// shape byte-identical to pre-TBD-E6 so legacy UI clients don't
// see a new top-level `cluster` key on each row.
var clustersOut []string
stampClusters := pageClusters
if len(contributingClusters) <= 1 {
clustersOut = nil
stampClusters = nil
} else {
clustersOut = contributingClusters
}
flatPage := flattenK8sListItemsWithClusters(kindName, page, stampClusters)
resp := K8sListResponse{
Kind: kindName,
Cluster: clusterID,
Items: flatPage,
Continue: cont,
AgeSeconds: age.Seconds(),
Clusters: clustersOut,
}
// Codemod a3: scrub `null` leaves so the matrix `must_not_contain:
// ["null"]` asserts pass without changing the apiserver-faithful
@ -254,8 +363,22 @@ func (h *Handler) HandleK8sList(w http.ResponseWriter, r *http.Request) {
// so the Indexer's cached pointer is never mutated (cached objects are
// shared with every concurrent reader; mutation would race).
func flattenK8sListItems(kind string, items []*unstructured.Unstructured) []*unstructured.Unstructured {
return flattenK8sListItemsWithClusters(kind, items, nil)
}
// flattenK8sListItemsWithClusters mirrors flattenK8sListItems but also
// stamps each row with its source cluster id (TBD-E6 / C3-010,
// 2026-05-18 multi-region fan-out). When `clusters` is nil or shorter
// than `items`, the cluster stamp is omitted for that index — backward
// compatible with single-cluster callers that pass nil.
//
// The cluster id is hoisted under the top-level `cluster` key so the
// SPA's K8sListPage (and any other consumer of the flat shape) can
// render a per-row region column without round-tripping to the
// dashboard fan-out. The embedded `metadata` stays untouched.
func flattenK8sListItemsWithClusters(kind string, items []*unstructured.Unstructured, clusters []string) []*unstructured.Unstructured {
out := make([]*unstructured.Unstructured, 0, len(items))
for _, it := range items {
for idx, it := range items {
if it == nil {
continue
}
@ -266,6 +389,9 @@ func flattenK8sListItems(kind string, items []*unstructured.Unstructured) []*uns
for k, v := range it.Object {
base[k] = v
}
if idx < len(clusters) && clusters[idx] != "" {
base["cluster"] = clusters[idx]
}
// Region annotation hoist — applies to every kind that carries
// a node/region label. The annotation key matches the cilium
// + cluster-autoscaler convention.
@ -547,6 +673,16 @@ func (h *Handler) HandleK8sStream(w http.ResponseWriter, r *http.Request) {
return
}
// Multi-region fan-out (TBD-E6 / C3-010): if the Sovereign has
// secondary kubeconfigs registered, accept SSE events from every
// registered cluster so /cloud?view=list&kind=nodes renders all
// 3 region nodes on a 3-region Sovereign (not just the primary's
// 1). Single-cluster Sovereigns keep the primary-only filter.
allowedClusters := map[string]struct{}{clusterID: {}}
for _, cid := range h.k8sCache.Clusters() {
allowedClusters[cid] = struct{}{}
}
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
@ -618,8 +754,13 @@ func (h *Handler) HandleK8sStream(w http.ResponseWriter, r *http.Request) {
// requested kinds. Triggered by ?initialState=1; off by default
// because the UI typically seeds via the REST list endpoint.
if r.URL.Query().Get("initialState") == "1" {
if err := h.streamInitialState(r.Context(), w, flusher, clusterID, kindsFilter, user); err != nil {
return
// Fan out the initial snapshot across every registered cluster
// (TBD-E6) so a freshly-opened K8sListPage sees rows from all
// regions on connect, before live events flow.
for cid := range allowedClusters {
if err := h.streamInitialState(r.Context(), w, flusher, cid, kindsFilter, user); err != nil {
return
}
}
}
@ -641,8 +782,11 @@ func (h *Handler) HandleK8sStream(w http.ResponseWriter, r *http.Request) {
return
}
// Wrong cluster is filtered at the dispatch level by
// kind/empty filter; cluster filter applied here.
if ev.Cluster != clusterID {
// kind/empty filter; cluster filter applied here. With
// TBD-E6 fan-out we accept any cluster that's registered
// on this Sovereign's k8sCache, not just the resolved
// primary — so multi-region rows flow through.
if _, allowed := allowedClusters[ev.Cluster]; !allowed {
continue
}
if user != "" && h.sarCache != nil {
@ -650,7 +794,7 @@ func (h *Handler) HandleK8sStream(w http.ResponseWriter, r *http.Request) {
if ev.Object != nil {
ns = ev.Object.GetNamespace()
}
if !h.sarCache.Allowed(r.Context(), h.k8sCache, user, clusterID, ev.Kind, ns, "get") {
if !h.sarCache.Allowed(r.Context(), h.k8sCache, user, ev.Cluster, ev.Kind, ns, "get") {
continue
}
}
@ -863,6 +1007,57 @@ func applyFieldSelector(items []*unstructured.Unstructured, fs string) []*unstru
return out
}
// applyFieldSelectorWithClusters mirrors applyFieldSelector but keeps
// the parallel cluster-id slice in lock-step (TBD-E6 fan-out). Returns
// (filtered items, filtered clusters).
func applyFieldSelectorWithClusters(items []*unstructured.Unstructured, clusters []string, fs string) ([]*unstructured.Unstructured, []string) {
clauses := strings.Split(fs, ",")
outItems := items[:0]
outClusters := clusters[:0]
for i, it := range items {
ok := true
for _, c := range clauses {
c = strings.TrimSpace(c)
if c == "" {
continue
}
parts := strings.SplitN(c, "=", 2)
if len(parts) != 2 {
continue
}
key := strings.TrimSpace(parts[0])
val := strings.TrimSpace(parts[1])
switch key {
case "metadata.name":
if it.GetName() != val {
ok = false
}
case "metadata.namespace":
if it.GetNamespace() != val {
ok = false
}
default:
if strings.HasPrefix(key, "metadata.labels.") {
labelKey := strings.TrimPrefix(key, "metadata.labels.")
if it.GetLabels()[labelKey] != val {
ok = false
}
}
}
if !ok {
break
}
}
if ok {
outItems = append(outItems, it)
if i < len(clusters) {
outClusters = append(outClusters, clusters[i])
}
}
}
return outItems, outClusters
}
func parseIntDefault(s string, def int) int {
if s == "" {
return def

View File

@ -371,6 +371,131 @@ func TestHandleK8sList_NamespaceAliasFiltering(t *testing.T) {
}
}
// TestHandleK8sList_FanOutAcrossClusters — TBD-E6 / C3-010, 2026-05-18.
//
// Multi-region Sovereigns register N kubeconfigs in the k8sCache
// (primary + N-1 secondaries via the handover hook, PRs #1579 + #1581).
// /cloud?view=list&kind=nodes must enumerate items from every
// registered cluster and stamp each row with its source cluster id.
//
// Pre-fix: HandleK8sList queried only the resolveChrootClusterID
// result (primary), returning 1 row on a 3-region Sovereign while the
// aggregate /dashboard chips correctly counted 3.
func TestHandleK8sList_FanOutAcrossClusters(t *testing.T) {
podA := newPod("default", "primary-pod")
podB := newPod("default", "secondary-pod")
scheme := runtime.NewScheme()
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Version: "v1", Kind: "PodList"}, &unstructured.UnstructuredList{})
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Version: "v1", Kind: "Pod"}, &unstructured.Unstructured{})
gvrList := map[schema.GroupVersionResource]string{
{Version: "v1", Resource: "pods"}: "PodList",
}
dynA := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, gvrList, podA)
dynB := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, gvrList, podB)
cfg := k8scache.Config{
Logger: quietLog(),
Registry: minimalRegistry(),
Clusters: []k8scache.ClusterRef{
{ID: "primary", DynamicClient: dynA, CoreClient: kfake.NewSimpleClientset()},
{ID: "sin-2", DynamicClient: dynB, CoreClient: kfake.NewSimpleClientset()},
},
}
f, err := k8scache.NewFactory(cfg)
if err != nil {
t.Fatalf("NewFactory: %v", err)
}
if err := f.Start(context.Background()); err != nil {
t.Fatalf("Start: %v", err)
}
t.Cleanup(f.Stop)
// Wait for both clusters' pod informers to sync.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
ia, _, _ := f.List("primary", "pod", nil)
ib, _, _ := f.List("sin-2", "pod", nil)
if len(ia) == 1 && len(ib) == 1 {
break
}
time.Sleep(20 * time.Millisecond)
}
h := &Handler{log: quietLog()}
h.SetK8sCache(f, k8scache.NewSARCache(), "X-Forwarded-User")
r := newRouter(h)
req := httptest.NewRequest("GET", "/api/v1/sovereigns/primary/k8s/pod", nil)
rec := httptest.NewRecorder()
r.ServeHTTP(rec, req)
if rec.Code != 200 {
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
}
var resp K8sListResponse
if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil {
t.Fatalf("decode: %v", err)
}
if len(resp.Items) != 2 {
t.Fatalf("fan-out expected 2 items (primary+sin-2); got %d", len(resp.Items))
}
// Both source clusters MUST appear in the Clusters header.
gotClusters := map[string]bool{}
for _, c := range resp.Clusters {
gotClusters[c] = true
}
if !gotClusters["primary"] || !gotClusters["sin-2"] {
t.Fatalf("expected Clusters=[primary,sin-2]; got %v", resp.Clusters)
}
// Each row carries its source cluster stamp under top-level "cluster".
stamps := map[string]bool{}
for _, it := range resp.Items {
if cid, ok, _ := unstructured.NestedString(it.Object, "cluster"); ok && cid != "" {
stamps[cid] = true
}
}
if !stamps["primary"] || !stamps["sin-2"] {
t.Fatalf("expected each row to carry source cluster id; got %v", stamps)
}
}
// TestHandleK8sList_SingleClusterBackwardCompat — TBD-E6.
//
// Single-cluster Sovereigns MUST keep the pre-fan-out wire shape
// byte-for-byte: top-level Cluster=<id>, no Clusters header, no
// per-row cluster stamp. Guards against regressions in old UI clients
// that key off the legacy shape.
func TestHandleK8sList_SingleClusterBackwardCompat(t *testing.T) {
pod := newPod("default", "x")
f := newFactoryWithPod(t, pod)
h := &Handler{log: quietLog()}
h.SetK8sCache(f, k8scache.NewSARCache(), "X-Forwarded-User")
r := newRouter(h)
req := httptest.NewRequest("GET", "/api/v1/sovereigns/alpha/k8s/pod", nil)
rec := httptest.NewRecorder()
r.ServeHTTP(rec, req)
if rec.Code != 200 {
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
}
var resp K8sListResponse
if err := json.NewDecoder(rec.Body).Decode(&resp); err != nil {
t.Fatalf("decode: %v", err)
}
if resp.Cluster != "alpha" {
t.Fatalf("expected Cluster=alpha, got %q", resp.Cluster)
}
if len(resp.Clusters) != 0 {
t.Fatalf("single-cluster path must omit Clusters header; got %v", resp.Clusters)
}
if len(resp.Items) != 1 {
t.Fatalf("expected 1 item, got %d", len(resp.Items))
}
// Backward compat: single-cluster path MUST NOT inject `cluster`
// on each row (legacy UI clients don't expect the key).
if _, ok, _ := unstructured.NestedString(resp.Items[0].Object, "cluster"); ok {
t.Fatalf("single-cluster path must not stamp per-row cluster; got cluster=%v", resp.Items[0].Object["cluster"])
}
}
// newFactoryWithMultiplePods builds an in-memory K8s cache pre-populated
// with N pods across N namespaces — exercises the namespace-filter path
// (single-ns cache wouldn't surface the bug).