This is an automated email from the ASF dual-hosted git repository. wu-sheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-horizon-ui.git
commit d3cdf7747ef8f3e7bf766fce00c73b1071d92003 Author: Wu Sheng <[email protected]> AuthorDate: Tue May 12 10:12:06 2026 +0800 bff: oap proxy with cluster fan-out and preflight --- apps/bff/src/audit/logger.ts | 10 +- apps/bff/src/auth/middleware.ts | 67 +++++ apps/bff/src/auth/sessions.ts | 13 + apps/bff/src/config/loader.ts | 3 + apps/bff/src/oap/clients.ts | 105 ++++++++ apps/bff/src/oap/cluster.ts | 151 +++++++++++ apps/bff/src/oap/inspect-exec.ts | 200 +++++++++++++++ apps/bff/src/oap/mqe-target.ts | 196 ++++++++++++++ apps/bff/src/oap/preflight-routes.ts | 45 ++++ apps/bff/src/oap/preflight.ts | 184 +++++++++++++ apps/bff/src/oap/routes.ts | 482 +++++++++++++++++++++++++++++++++++ apps/bff/src/oap/server-time.ts | 187 ++++++++++++++ apps/bff/src/rbac/policy.ts | 10 + apps/bff/src/server.ts | 7 + 14 files changed, 1658 insertions(+), 2 deletions(-) diff --git a/apps/bff/src/audit/logger.ts b/apps/bff/src/audit/logger.ts index c10dc28..325c656 100644 --- a/apps/bff/src/audit/logger.ts +++ b/apps/bff/src/audit/logger.ts @@ -22,11 +22,17 @@ import { logger } from '../logger.js'; export interface AuditEvent { ts: string; // ISO-8601 - actor: string; + actor: string | null; action: string; // e.g. "rule.addOrUpdate", "auth.login", "role.update" + /** Optional verb that authorized the action (e.g. "rule:write"). */ + verb?: string; target?: string; - outcome: 'success' | 'failure'; + /** Free-form outcome string; common values include "success", "failure", + * the OAP `applyStatus` value, or `http_<code>`. */ + outcome: string; details?: Record<string, unknown>; + fromIp?: string; + sessionId?: string; } export class AuditLogger { diff --git a/apps/bff/src/auth/middleware.ts b/apps/bff/src/auth/middleware.ts new file mode 100644 index 0000000..074dd46 --- /dev/null +++ b/apps/bff/src/auth/middleware.ts @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Per-request auth + RBAC pre-handlers. Pluggable into any Fastify + * route via `{ preHandler: [requireAuth(deps), requireVerb(deps, 'rule:read')] }`. + * + * Two-step shape (require-auth, then require-verb) keeps the verb name + * visible at the route declaration site. + */ + +import type { FastifyReply, FastifyRequest } from 'fastify'; +import type { ConfigSource } from '../config/loader.js'; +import { sessionHasVerb } from '../rbac/policy.js'; +import type { Session, SessionStore } from './sessions.js'; + +declare module 'fastify' { + interface FastifyRequest { + session?: Session; + } +} + +export interface AuthDeps { + config: ConfigSource; + sessions: SessionStore; +} + +export function requireAuth(deps: AuthDeps) { + return async function authPreHandler(req: FastifyRequest, reply: FastifyReply): Promise<void> { + const cookieName = deps.config.current_().session.cookieName; + const sid = req.cookies?.[cookieName]; + if (!sid) { + return void reply.code(401).send({ error: 'unauthenticated' }); + } + const session = deps.sessions.get(sid); + if (!session) { + return void reply.code(401).send({ error: 'unauthenticated' }); + } + req.session = session; + }; +} + +export function requireVerb(deps: AuthDeps, verb: string) { + return async function verbPreHandler(req: FastifyRequest, reply: FastifyReply): Promise<void> { + const session = req.session; + if (!session) { + return void reply.code(401).send({ error: 'unauthenticated' }); + } + if (!sessionHasVerb(deps.config.current_(), session.roles, verb)) { + return void reply.code(403).send({ error: 'permission_denied', verb }); + } + }; +} diff --git a/apps/bff/src/auth/sessions.ts b/apps/bff/src/auth/sessions.ts index 6c19bae..18036bb 100644 --- a/apps/bff/src/auth/sessions.ts +++ b/apps/bff/src/auth/sessions.ts @@ -60,6 +60,19 @@ export class SessionStore { return session; } + // Read-without-touch — used by route handlers that just need identity + // and don't want to slide the TTL window. Returns `undefined` for + // expired sessions. + get(sid: string): Session | undefined { + const session = this.sessions.get(sid); + if (!session) return undefined; + if (Date.now() - session.lastSeenAt > this.ttlMs) { + this.sessions.delete(sid); + return undefined; + } + return session; + } + destroy(sid: string): void { this.sessions.delete(sid); } diff --git a/apps/bff/src/config/loader.ts b/apps/bff/src/config/loader.ts index 02e069a..bda42fe 100644 --- a/apps/bff/src/config/loader.ts +++ b/apps/bff/src/config/loader.ts @@ -24,6 +24,8 @@ import { configSchema, type HorizonConfig } from './schema.js'; export interface ConfigSource { readonly current: HorizonConfig; readonly path: string; + /** Function form for code paths that prefer a getter call. Returns the same as `.current`. */ + current_(): HorizonConfig; onChange(fn: (cfg: HorizonConfig) => void): () => void; close(): Promise<void>; } @@ -64,6 +66,7 @@ export function loadConfig(configPath: string): ConfigSource { get current() { return current; }, + current_: () => current, path: absPath, onChange(fn) { listeners.add(fn); diff --git a/apps/bff/src/oap/clients.ts b/apps/bff/src/oap/clients.ts new file mode 100644 index 0000000..8838928 --- /dev/null +++ b/apps/bff/src/oap/clients.ts @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Construct OAP API clients on demand. Per-request construction is fine + * — the clients are thin classes with no per-instance state worth + * pooling. The factory exists so tests can inject a stub fetch and so + * config hot-reload picks up new admin URLs without restart. + */ + +import { + DslDebuggingClient, + InspectClient, + OalClient, + RuntimeRuleClient, + StatusClient, + type FetchLike, +} from '@skywalking-horizon-ui/api-client'; +import type { HorizonConfig } from '../config/schema.js'; + +export interface OapClients { + /** Build a runtime-rule client for one specific admin URL — used + * by the cluster fan-out, which talks to every URL. */ + forUrl(adminUrl: string): RuntimeRuleClient; + /** Convenience — runtime-rule client for the *first* admin URL, + * used for reads and for writes (OAP's forward-RPC handles peer + * convergence). */ + primary(): RuntimeRuleClient; + /** Status / cluster-discovery client — port 12800. */ + status(): StatusClient; + /** OAL read-only management client for the *first* admin URL. + * OAL listing is per-node and identical across nodes (modulo + * binary-version drift, which is operator deployment discipline); + * the BFF doesn't fan-out for the catalog browse. */ + oal(): OalClient; + /** DSL-debugging client for the *first* admin URL. Session install + * and collect both fan-out internally on the OAP side; Studio + * hits one node and lets OAP do the cluster work. */ + debug(): DslDebuggingClient; + /** Build a DSL-debugging client for one specific admin URL — used + * by the per-node fan-out for `/dsl-debugging/status`. */ + debugForUrl(adminUrl: string): DslDebuggingClient; + /** Inspect API client (SWIP-14) — metadata-only catalog + entity + * enumeration. Identical across nodes (storage is shared), so we + * bind to the first admin URL. */ + inspect(): InspectClient; + /** All admin URLs, in config order. */ + adminUrls(): readonly string[]; +} + +export interface BuildOapClientsOptions { + fetch?: FetchLike; +} + +export function buildOapClients( + config: HorizonConfig, + opts: BuildOapClientsOptions = {}, +): OapClients { + const fetch = opts.fetch; + const primaryUrl = config.oap.adminUrls[0]!; + // Single source of truth for per-call timeout — every client + // constructed via this factory inherits it. 0 means no timeout + // (passed through verbatim to the clients). + const timeoutMs = config.oap.timeoutMs; + return { + forUrl(adminUrl: string): RuntimeRuleClient { + return new RuntimeRuleClient({ adminUrl, fetch, timeoutMs }); + }, + primary(): RuntimeRuleClient { + return new RuntimeRuleClient({ adminUrl: primaryUrl, fetch, timeoutMs }); + }, + status(): StatusClient { + return new StatusClient({ statusUrl: config.oap.statusUrl, fetch, timeoutMs }); + }, + oal(): OalClient { + return new OalClient({ adminUrl: primaryUrl, fetch, timeoutMs }); + }, + debug(): DslDebuggingClient { + return new DslDebuggingClient({ adminUrl: primaryUrl, fetch, timeoutMs }); + }, + debugForUrl(adminUrl: string): DslDebuggingClient { + return new DslDebuggingClient({ adminUrl, fetch, timeoutMs }); + }, + inspect(): InspectClient { + return new InspectClient({ adminUrl: primaryUrl, fetch, timeoutMs }); + }, + adminUrls(): readonly string[] { + return config.oap.adminUrls; + }, + }; +} diff --git a/apps/bff/src/oap/cluster.ts b/apps/bff/src/oap/cluster.ts new file mode 100644 index 0000000..9257618 --- /dev/null +++ b/apps/bff/src/oap/cluster.ts @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Cluster state — fan-out `/runtime/rule/list` to every configured OAP + * admin URL and pivot the result into a per-rule × per-node matrix the + * SPA renders directly. + * + * The fan-out runs in parallel; one slow node doesn't delay the + * others. Per-node failures are captured (so the operator can see + * which node is unreachable) without failing the whole call. + */ + +import type { + Catalog, + ListEnvelope, + ListRow, + LocalState, + RuleStatus, +} from '@skywalking-horizon-ui/api-client'; +import type { OapClients } from './clients.js'; + +export interface NodeListResult { + url: string; + envelope: ListEnvelope | null; + /** Set when `envelope === null`. */ + error?: string; +} + +export interface ClusterRulePerNode { + status: RuleStatus | null; + localState: LocalState | null; + contentHash: string | null; + lastApplyError: string; +} + +export interface ClusterRule { + catalog: Catalog; + name: string; + /** True iff every responding node has this rule with the same + * contentHash. False when any node is missing the rule, or any two + * nodes disagree on contentHash. */ + converged: boolean; + /** Map of admin URL → per-node state. A node that responded but + * doesn't have the rule appears with `status: null`. A node that + * didn't respond at all is omitted (its top-level `nodes[i].ok` + * surfaces the failure). */ + perNode: Record<string, ClusterRulePerNode>; +} + +export interface ClusterStateResponse { + generatedAt: number; + /** Per-node availability summary. Includes every configured admin + * URL, even when the node didn't respond. */ + nodes: { url: string; ok: boolean; error?: string }[]; + rules: ClusterRule[]; +} + +/** Fan-out helper. Returns a per-node array preserving the input + * order; failed nodes have `envelope: null` + `error`. */ +export async function fetchPerNode(clients: OapClients): Promise<NodeListResult[]> { + const urls = clients.adminUrls(); + return Promise.all( + urls.map(async (url): Promise<NodeListResult> => { + try { + const envelope = await clients.forUrl(url).list(); + return { url, envelope }; + } catch (err) { + return { + url, + envelope: null, + error: err instanceof Error ? err.message : String(err), + }; + } + }), + ); +} + +/** Pivot per-node `/list` envelopes into a per-rule matrix. Pure — + * takes the fan-out result and returns the response shape. */ +export function pivotClusterState( + perNode: NodeListResult[], + generatedAt: number = Date.now(), +): ClusterStateResponse { + const respondingUrls = perNode.filter((n) => n.envelope !== null).map((n) => n.url); + + const seen = new Map<string, ClusterRule>(); + for (const node of perNode) { + if (!node.envelope) continue; + for (const rule of node.envelope.rules) { + const key = `${rule.catalog}/${rule.name}`; + let entry = seen.get(key); + if (!entry) { + entry = { + catalog: rule.catalog, + name: rule.name, + converged: false, // computed below + perNode: {}, + }; + seen.set(key, entry); + } + entry.perNode[node.url] = { + status: rule.status, + localState: rule.localState, + contentHash: rule.contentHash, + lastApplyError: extractLastApplyError(rule), + }; + } + } + + // Convergence pass — all responding nodes must have the rule with the + // same contentHash. + for (const entry of seen.values()) { + const presentOn = Object.keys(entry.perNode); + if (presentOn.length !== respondingUrls.length) { + entry.converged = false; + continue; + } + const hashes = new Set(Object.values(entry.perNode).map((p) => p.contentHash)); + entry.converged = hashes.size === 1; + } + + return { + generatedAt, + nodes: perNode.map((n) => ({ + url: n.url, + ok: n.envelope !== null, + ...(n.error !== undefined ? { error: n.error } : {}), + })), + rules: [...seen.values()], + }; +} + +function extractLastApplyError(row: ListRow): string { + // Operator-pushed rows carry the field; bundled / orphan rows don't. + return 'lastApplyError' in row ? row.lastApplyError : ''; +} diff --git a/apps/bff/src/oap/inspect-exec.ts b/apps/bff/src/oap/inspect-exec.ts new file mode 100644 index 0000000..9e34dc3 --- /dev/null +++ b/apps/bff/src/oap/inspect-exec.ts @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * MQE fire — `POST /api/inspect/exec` forwards to OAP's GraphQL + * `mutation execExpression(expression, entity, duration)` and returns + * the `ExpressionResult` payload to the SPA. + * + * SWIP-14 deliberately punts value queries off the admin port so that + * MQE auth / quotas / observability stay on a single surface. The BFF + * resolves that surface via `MqeTargetCache` (Phase 3) and is the + * single egress point for MQE traffic from Studio. + * + * The handler validates inbound shape, builds the GraphQL request, + * folds GraphQL errors into the BFF's envelope (`mqe_error`), and + * returns `data.execExpression` verbatim on success. The SPA does not + * need GraphQL knowledge. + */ + +import type { FastifyReply } from 'fastify'; +import type { FetchLike, ExpressionResult, MqeEntity } from '@skywalking-horizon-ui/api-client'; +import { INSPECT_STEPS, isInspectDate, type InspectStep } from '@skywalking-horizon-ui/api-client'; +import type { MqeTarget } from './mqe-target.js'; + +interface DurationInput { + start: string; + end: string; + step: InspectStep; + coldStage?: boolean; +} + +interface ExecBody { + expression: string; + entity: MqeEntity; + duration: DurationInput; + debug?: boolean; +} + +export interface ExecDeps { + fetch: FetchLike; + /** Per-call timeout (ms). 0 disables. */ + timeoutMs: number; +} + +/* SkyWalking's MQE entry point is on `Query`, not `Mutation` + * (see oap-server/.../metrics-v3.graphqls — `extend type Query`). */ +const GRAPHQL_QUERY = + 'query Exec($expression: String!, $entity: Entity!, $duration: Duration!, $debug: Boolean) {\n' + + ' execExpression(expression: $expression, entity: $entity, duration: $duration, debug: $debug) {\n' + + ' type\n' + + ' error\n' + + ' results {\n' + + ' metric { labels { key value } }\n' + + ' values { id value traceID owner { scope serviceID serviceName normal serviceInstanceID serviceInstanceName endpointID endpointName } }\n' + + ' }\n' + + ' }\n' + + '}\n'; + +/** Validate the request body and either send a 400 / return null, or + * return the parsed body for execution. */ +export function parseExecBody(body: unknown, reply: FastifyReply): ExecBody | null { + if (typeof body !== 'object' || body === null) { + reply.code(400).send({ error: 'invalid_body' }); + return null; + } + const b = body as Partial<ExecBody>; + if (typeof b.expression !== 'string' || b.expression.length === 0) { + reply.code(400).send({ error: 'missing_expression' }); + return null; + } + if (typeof b.entity !== 'object' || b.entity === null) { + reply.code(400).send({ error: 'missing_entity' }); + return null; + } + if (typeof (b.entity as MqeEntity).scope !== 'string') { + reply.code(400).send({ error: 'invalid_entity', detail: 'scope is required' }); + return null; + } + if (typeof b.duration !== 'object' || b.duration === null) { + reply.code(400).send({ error: 'missing_duration' }); + return null; + } + const d = b.duration as DurationInput; + if (typeof d.start !== 'string' || typeof d.end !== 'string') { + reply.code(400).send({ error: 'invalid_duration', detail: 'start and end must be strings' }); + return null; + } + if (typeof d.step !== 'string' || !INSPECT_STEPS.includes(d.step.toUpperCase() as InspectStep)) { + reply.code(400).send({ + error: 'invalid_duration', + detail: `step must be one of ${INSPECT_STEPS.join(', ')}`, + }); + return null; + } + const step = d.step.toUpperCase() as InspectStep; + if (!isInspectDate(d.start, step)) { + reply + .code(400) + .send({ error: 'invalid_duration', detail: `start does not match ${step} format` }); + return null; + } + if (!isInspectDate(d.end, step)) { + reply + .code(400) + .send({ error: 'invalid_duration', detail: `end does not match ${step} format` }); + return null; + } + return { + expression: b.expression, + entity: b.entity as MqeEntity, + duration: { + start: d.start, + end: d.end, + step, + ...(d.coldStage !== undefined ? { coldStage: !!d.coldStage } : {}), + }, + ...(b.debug !== undefined ? { debug: !!b.debug } : {}), + }; +} + +interface GraphQlEnvelope { + data?: { execExpression?: ExpressionResult }; + errors?: { message: string; path?: string[] }[]; +} + +/** Fire the MQE mutation against the resolved base. Returns the + * `ExpressionResult` on success. Throws a {@link MqeFireError} with + * the GraphQL error array attached on failure. */ +export async function fireMqe( + target: MqeTarget, + req: ExecBody, + deps: ExecDeps, +): Promise<ExpressionResult> { + const url = `${target.baseUrl.replace(/\/$/, '')}/graphql`; + const payload = { + query: GRAPHQL_QUERY, + variables: { + expression: req.expression, + entity: req.entity, + duration: req.duration, + debug: req.debug ?? false, + }, + }; + let init: RequestInit = { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json', + }, + body: JSON.stringify(payload), + }; + let timer: ReturnType<typeof setTimeout> | null = null; + if (deps.timeoutMs > 0) { + const ctrl = new AbortController(); + timer = setTimeout(() => ctrl.abort(), deps.timeoutMs); + init = { ...init, signal: ctrl.signal }; + } + try { + const res = await deps.fetch(url, init); + if (!res.ok) { + const text = await res.text(); + throw new MqeFireError(`MQE HTTP ${res.status}: ${text.slice(0, 200)}`, []); + } + const env = (await res.json()) as GraphQlEnvelope; + if (env.errors && env.errors.length > 0) { + const msg = env.errors.map((e) => e.message).join('; '); + throw new MqeFireError(`MQE error: ${msg}`, env.errors); + } + if (!env.data || !env.data.execExpression) { + throw new MqeFireError('MQE response missing data.execExpression', []); + } + return env.data.execExpression; + } finally { + if (timer) clearTimeout(timer); + } +} + +export class MqeFireError extends Error { + constructor( + message: string, + public readonly graphqlErrors: { message: string; path?: string[] }[], + ) { + super(message); + this.name = 'MqeFireError'; + } +} diff --git a/apps/bff/src/oap/mqe-target.ts b/apps/bff/src/oap/mqe-target.ts new file mode 100644 index 0000000..b6589e8 --- /dev/null +++ b/apps/bff/src/oap/mqe-target.ts @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Resolve the GraphQL base URL for MQE (`execExpression`) calls. + * + * Resolution order: + * + * 1. Operator override in studio.yaml under `oap.mqe.{host,port}`. + * Both fields are independently optional — if only host is set, + * port is discovered, and vice versa. + * + * 2. Admin-server `GET /debugging/config/dump` — a flat + * `Map<String,String>` with keys like `core.default.restPort` + * and (when the sharing-server module is enabled) + * `sharing-server.default.restPort`. Prefer the sharing-server + * values because 10.5+ defaults the query GraphQL there; fall + * back to core's REST otherwise. + * + * 3. Host fallback: if the discovered bind host is empty / wildcard + * (`0.0.0.0`, `::`), reuse the admin URL's hostname. Port-forward + * and ingress setups frequently bind on a wildcard but expose a + * reachable address as the admin URL. + * + * The result is cached in-process for `CACHE_TTL_MS`. Cache is busted + * by either elapsed time or an explicit `refresh()` call (driven by + * the SPA's manual refresh button). + */ + +import type { FetchLike } from '@skywalking-horizon-ui/api-client'; +import type { HorizonConfig } from '../config/schema.js'; + +export interface MqeTarget { + /** e.g. `http://oap-rest.cluster.local:12800` — no trailing slash. */ + baseUrl: string; + /** Human-readable rationale for the operator: `sharing-server`, + * `core.restPort`, `studio.yaml override`, `admin host fallback`, + * combinations thereof. */ + via: string; + /** What the operator's studio.yaml had set (echo, not discovery). */ + configured: { host?: string; port?: number }; +} + +export interface ResolveDeps { + config(): HorizonConfig; + fetch: FetchLike; +} + +const CACHE_TTL_MS = 60_000; + +interface CacheEntry { + target: MqeTarget; + expiresAt: number; +} + +export class MqeTargetCache { + private entry: CacheEntry | null = null; + + invalidate(): void { + this.entry = null; + } + + async resolve(deps: ResolveDeps): Promise<MqeTarget> { + const now = Date.now(); + if (this.entry && this.entry.expiresAt > now) return this.entry.target; + const target = await resolveMqeTarget(deps); + this.entry = { target, expiresAt: now + CACHE_TTL_MS }; + return target; + } +} + +async function resolveMqeTarget(deps: ResolveDeps): Promise<MqeTarget> { + const cfg = deps.config().oap; + const configured: { host?: string; port?: number } = {}; + if (cfg.mqe?.host !== undefined) configured.host = cfg.mqe.host; + if (cfg.mqe?.port !== undefined) configured.port = cfg.mqe.port; + + // Fast path: full override — no admin call needed. + if (configured.host !== undefined && configured.port !== undefined) { + return { + baseUrl: `http://${configured.host}:${configured.port}`, + via: 'studio.yaml override (host + port)', + configured, + }; + } + + // Otherwise we need the config dump. Fetch from the first admin URL. + const adminUrl = cfg.adminUrls[0]!; + const dump = await fetchConfigDump(adminUrl, deps.fetch, cfg.timeoutMs); + const adminHost = new URL(adminUrl).hostname; + + const picked = pickFromDump(dump, adminHost); + + const finalHost = configured.host ?? picked.host; + const finalPort = configured.port ?? picked.port; + + if (finalPort === undefined) { + throw new Error( + 'mqe target: could not discover REST port from /debugging/config/dump (neither sharing-server nor core REST appears in the dump)', + ); + } + + const viaParts: string[] = []; + viaParts.push( + configured.host !== undefined ? 'host from studio.yaml' : `host from ${picked.hostFrom}`, + ); + viaParts.push( + configured.port !== undefined ? 'port from studio.yaml' : `port from ${picked.portFrom}`, + ); + + return { + baseUrl: `http://${finalHost}:${finalPort}`, + via: viaParts.join(', '), + configured, + }; +} + +interface PickResult { + host: string; + port: number | undefined; + hostFrom: string; + portFrom: string; +} + +function pickFromDump(dump: Record<string, string>, adminHost: string): PickResult { + // Prefer sharing-server over core (10.5+ defaults the query + // GraphQL on the sharing-server REST). + const sharingHost = dump['sharing-server.default.restHost']; + const sharingPortStr = dump['sharing-server.default.restPort']; + const coreHost = dump['core.default.restHost']; + const corePortStr = dump['core.default.restPort']; + + const sharingPort = parsePort(sharingPortStr); + const corePort = parsePort(corePortStr); + + const preferSharing = sharingPort !== undefined; + const host = preferSharing ? sharingHost : coreHost; + const port = preferSharing ? sharingPort : corePort; + const moduleLabel = preferSharing ? 'sharing-server.restHost' : 'core.restHost'; + const portLabel = preferSharing ? 'sharing-server.restPort' : 'core.restPort'; + + // Wildcard host fallback. OAP commonly binds on 0.0.0.0; the operator + // reaches it through the admin URL's host. + const isWildcard = !host || host === '0.0.0.0' || host === '::' || host === ''; + const resolvedHost = isWildcard ? adminHost : host; + const hostFrom = isWildcard ? `admin URL host (${moduleLabel} was wildcard)` : moduleLabel; + + return { host: resolvedHost, port, hostFrom, portFrom: portLabel }; +} + +function parsePort(raw: string | undefined): number | undefined { + if (raw === undefined || raw === '') return undefined; + const n = Number.parseInt(raw, 10); + if (!Number.isFinite(n) || n <= 0 || n > 65535) return undefined; + return n; +} + +async function fetchConfigDump( + adminUrl: string, + fetch: FetchLike, + timeoutMs: number, +): Promise<Record<string, string>> { + const base = adminUrl.replace(/\/$/, ''); + const url = `${base}/debugging/config/dump`; + let init: RequestInit = { method: 'GET', headers: { Accept: 'application/json' } }; + let timer: ReturnType<typeof setTimeout> | null = null; + if (timeoutMs > 0) { + const ctrl = new AbortController(); + timer = setTimeout(() => ctrl.abort(), timeoutMs); + init = { ...init, signal: ctrl.signal }; + } + try { + const res = await fetch(url, init); + if (!res.ok) { + const body = await res.text(); + throw new Error(`config dump failed: HTTP ${res.status} from ${url} — ${body.slice(0, 200)}`); + } + return (await res.json()) as Record<string, string>; + } finally { + if (timer) clearTimeout(timer); + } +} diff --git a/apps/bff/src/oap/preflight-routes.ts b/apps/bff/src/oap/preflight-routes.ts new file mode 100644 index 0000000..960bbc2 --- /dev/null +++ b/apps/bff/src/oap/preflight-routes.ts @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import type { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify'; +import type { FetchLike } from '@skywalking-horizon-ui/api-client'; +import type { ConfigSource } from '../config/loader.js'; +import { requireAuth } from '../auth/middleware.js'; +import type { SessionStore } from '../auth/sessions.js'; +import { runPreflight } from './preflight.js'; + +export interface PreflightRouteDeps { + config: ConfigSource; + sessions: SessionStore; + fetch?: FetchLike; +} + +/** `GET /api/preflight` — interrogates OAP's config-dump and returns + * per-module enablement. Authenticated but ungated by verb — every + * logged-in user can see whether OAP is correctly set up. */ +export function registerPreflightRoutes(app: FastifyInstance, deps: PreflightRouteDeps): void { + const auth = requireAuth(deps); + app.get( + '/api/preflight', + { preHandler: auth }, + async (_req: FastifyRequest, reply: FastifyReply) => { + const fetchImpl = deps.fetch ?? globalThis.fetch.bind(globalThis); + const result = await runPreflight(deps.config.current, fetchImpl); + return reply.send(result); + }, + ); +} diff --git a/apps/bff/src/oap/preflight.ts b/apps/bff/src/oap/preflight.ts new file mode 100644 index 0000000..71e08b6 --- /dev/null +++ b/apps/bff/src/oap/preflight.ts @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Preflight check — interrogates `/debugging/config/dump` on OAP and + * reports which of Studio's required OAP modules are enabled. The + * SPA shows a one-time modal at login when any required selector is + * missing, listing the env var and what UI breaks without it. + * + * The dump returns a `Map<String,String>` of dotted keys + * `<module>.<provider>.<property>`. A module is enabled iff at least + * one key with its prefix appears in the dump. + * + * If admin-server itself is unreachable we return early with + * `adminReachable: false` and every module marked `enabled: false`; + * the operator's first move is "check OAP / admin port" rather than + * "set selectors". + */ + +import type { FetchLike } from '@skywalking-horizon-ui/api-client'; +import type { HorizonConfig } from '../config/schema.js'; + +export interface PreflightModule { + /** OAP module name as it appears in the config-dump key prefix. */ + name: string; + /** The env var that controls this module's selector. */ + envVar: string; + /** True when Studio depends on this module being on. */ + required: boolean; + /** True iff the dump carries at least one key with this module's prefix. */ + enabled: boolean; + /** What part of Studio's UI breaks when this module is off. */ + affects: string; +} + +export interface PreflightResult { + adminUrl: string; + /** True iff `/debugging/config/dump` responded 2xx. */ + adminReachable: boolean; + /** Short reason when `adminReachable` is false. */ + adminError?: string; + modules: PreflightModule[]; + /** Total keys in the dump. Diagnostic only. */ + dumpKeyCount: number; + generatedAt: number; +} + +interface ModuleDef { + name: string; + envVar: string; + required: boolean; + affects: string; +} + +const REQUIRED_MODULES: readonly ModuleDef[] = [ + { + name: 'admin-server', + envVar: 'SW_ADMIN_SERVER', + required: true, + affects: + 'Everything Studio does against the admin port. Without admin-server, the other three modules fail at boot with ModuleNotFoundException.', + }, + { + name: 'receiver-runtime-rule', + envVar: 'SW_RECEIVER_RUNTIME_RULE', + required: true, + affects: + "DSL Management (Catalog, OAL catalog), Editor save/load, Cluster status rule matrix, Live debugger rule picker, and the Inspect drawer's source attribution.", + }, + { + name: 'dsl-debugging', + envVar: 'SW_DSL_DEBUGGING', + required: true, + affects: + 'Live debugger across MAL / LAL / OAL (start / poll / stop) and the DSL-debugging health pane on Cluster status.', + }, + { + name: 'inspect', + envVar: 'SW_INSPECT', + required: true, + affects: + 'The Inspect page — every /api/inspect/* call returns 404 inspect_not_enabled and the page shows a banner instead of the board.', + }, +]; + +export async function runPreflight( + config: HorizonConfig, + fetch: FetchLike, +): Promise<PreflightResult> { + const adminUrl = config.oap.adminUrls[0]!; + const generatedAt = Date.now(); + const dump = await fetchConfigDump(adminUrl, fetch, config.oap.timeoutMs); + + if (!dump.ok) { + return { + adminUrl, + adminReachable: false, + adminError: dump.error, + modules: REQUIRED_MODULES.map((m) => ({ + name: m.name, + envVar: m.envVar, + required: m.required, + affects: m.affects, + enabled: false, + })), + dumpKeyCount: 0, + generatedAt, + }; + } + + const keys = Object.keys(dump.body); + const enabledPrefixes = new Set<string>(); + for (const k of keys) { + const top = k.split('.', 1)[0]; + if (top) enabledPrefixes.add(top); + } + + const modules: PreflightModule[] = REQUIRED_MODULES.map((m) => ({ + name: m.name, + envVar: m.envVar, + required: m.required, + affects: m.affects, + enabled: enabledPrefixes.has(m.name), + })); + + return { + adminUrl, + adminReachable: true, + modules, + dumpKeyCount: keys.length, + generatedAt, + }; +} + +interface DumpOk { + ok: true; + body: Record<string, string>; +} +interface DumpErr { + ok: false; + error: string; +} + +async function fetchConfigDump( + adminUrl: string, + fetch: FetchLike, + timeoutMs: number, +): Promise<DumpOk | DumpErr> { + const url = `${adminUrl.replace(/\/$/, '')}/debugging/config/dump`; + let init: RequestInit = { method: 'GET', headers: { Accept: 'application/json' } }; + let timer: ReturnType<typeof setTimeout> | null = null; + if (timeoutMs > 0) { + const ctrl = new AbortController(); + timer = setTimeout(() => ctrl.abort(), timeoutMs); + init = { ...init, signal: ctrl.signal }; + } + try { + const res = await fetch(url, init); + if (!res.ok) { + const text = (await res.text()).slice(0, 200); + return { ok: false, error: `HTTP ${res.status}${text ? ` — ${text}` : ''}` }; + } + const body = (await res.json()) as Record<string, string>; + return { ok: true, body }; + } catch (err) { + return { ok: false, error: err instanceof Error ? err.message : String(err) }; + } finally { + if (timer) clearTimeout(timer); + } +} diff --git a/apps/bff/src/oap/routes.ts b/apps/bff/src/oap/routes.ts new file mode 100644 index 0000000..18076df --- /dev/null +++ b/apps/bff/src/oap/routes.ts @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Studio's `/api/*` surface — eight route families that together cover + * everything v1's catalog browse, editor, cluster matrix, and dump UIs + * need. + * + * Each handler: + * 1. Runs through `requireAuth` (handled by route preHandler). + * 2. Resolves the verb required for this call. + * 3. Calls the OAP client, mapping errors back to HTTP via the + * `RuntimeRuleApiError` envelope so OAP's applyStatus codes + * reach the SPA verbatim. + * 4. Audits every mutating call, with the actor / verb / outcome. + */ + +import type { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify'; +import { + RuntimeRuleApiError, + isCatalog, + type Catalog, + type DeleteMode, +} from '@skywalking-horizon-ui/api-client'; +import type { ConfigSource } from '../config/loader.js'; +import type { AuditLogger } from '../audit/logger.js'; +import { requireAuth } from '../auth/middleware.js'; +import { sessionHasVerb } from '../rbac/policy.js'; +import type { Session } from '../auth/sessions.js'; +import type { SessionStore } from '../auth/sessions.js'; +import { buildOapClients, type OapClients } from './clients.js'; +import { fetchPerNode, pivotClusterState } from './cluster.js'; +import type { FetchLike } from '@skywalking-horizon-ui/api-client'; + +export interface OapRouteDeps { + config: ConfigSource; + sessions: SessionStore; + audit: AuditLogger; + /** Test seam — replaces `globalThis.fetch` in every OAP call. */ + fetch?: FetchLike; +} + +const TRUTHY = new Set(['true', '1', 'yes']); + +/** Register every `/api/*` route on the given Fastify instance. The + * text/plain content-type parser must already be registered (see + * server.ts). */ +export function registerOapRoutes(app: FastifyInstance, deps: OapRouteDeps): void { + const auth = requireAuth(deps); + + function clients(): OapClients { + return buildOapClients(deps.config.current, { fetch: deps.fetch }); + } + + // ── catalog browse ──────────────────────────────────────────────── + + app.get( + '/api/catalog/list', + { preHandler: auth }, + async (req: FastifyRequest, reply: FastifyReply) => { + if (!ensureVerb(req, reply, deps, 'rule:read')) return; + const catalog = parseOptionalCatalog(req.query, reply); + if (catalog === undefined && hasCatalogParam(req.query)) return; + try { + const env = await clients().primary().list(catalog); + return reply.send(env); + } catch (err) { + return passOapError(err, reply); + } + }, + ); + + app.get( + '/api/catalog/bundled', + { preHandler: auth }, + async (req: FastifyRequest, reply: FastifyReply) => { + if (!ensureVerb(req, reply, deps, 'rule:read')) return; + const catalog = parseRequiredCatalog(req.query, reply); + if (!catalog) return; + const withContent = parseBoolean((req.query as Record<string, string>).withContent, true); + try { + const list = await clients().primary().listBundled(catalog, withContent); + return reply.send(list); + } catch (err) { + return passOapError(err, reply); + } + }, + ); + + // ── single rule fetch ───────────────────────────────────────────── + + app.get('/api/rule', { preHandler: auth }, async (req: FastifyRequest, reply: FastifyReply) => { + if (!ensureVerb(req, reply, deps, 'rule:read')) return; + const q = req.query as Record<string, string | undefined>; + const catalog = parseRequiredCatalog(q, reply); + if (!catalog) return; + if (!q.name) { + return reply.code(400).send({ error: 'missing_name' }); + } + const source = q.source as 'runtime' | 'bundled' | undefined; + if (source !== undefined && source !== 'runtime' && source !== 'bundled') { + return reply.code(400).send({ error: 'invalid_source', value: source }); + } + const ifNoneMatch = req.headers['if-none-match'] as string | undefined; + try { + const got = await clients() + .primary() + .get({ + catalog, + name: q.name, + ...(source !== undefined ? { source } : {}), + ...(ifNoneMatch !== undefined ? { ifNoneMatch } : {}), + }); + if ('notModified' in got) { + reply.header('etag', got.etag); + reply.header('x-sw-content-hash', got.contentHash); + reply.header('x-sw-status', got.status); + return reply.code(304).send(); + } + reply.header('content-type', 'application/x-yaml; charset=utf-8'); + reply.header('etag', got.etag); + reply.header('x-sw-content-hash', got.contentHash); + reply.header('x-sw-status', got.status); + reply.header('x-sw-source', got.source); + reply.header('x-sw-update-time', String(got.updateTime)); + return reply.send(got.content); + } catch (err) { + return passOapError(err, reply); + } + }); + + // ── write paths (audit on every call) ───────────────────────────── + + app.post('/api/rule', { preHandler: auth }, async (req: FastifyRequest, reply: FastifyReply) => { + const q = req.query as Record<string, string | undefined>; + const catalog = parseRequiredCatalog(q, reply); + if (!catalog) return; + if (!q.name) return reply.code(400).send({ error: 'missing_name' }); + if (typeof req.body !== 'string' || req.body.length === 0) { + return reply.code(400).send({ error: 'empty_body' }); + } + const allowStorageChange = parseBoolean(q.allowStorageChange, false); + const force = parseBoolean(q.force, false); + const verb = allowStorageChange || force ? 'rule:write:structural' : 'rule:write'; + if (!ensureVerb(req, reply, deps, verb)) return; + + try { + const result = await clients().primary().addOrUpdate({ + catalog, + name: q.name, + body: req.body, + allowStorageChange, + force, + }); + auditMutation(deps, req, 'addOrUpdate', verb, catalog, q.name, result.applyStatus, { + allowStorageChange, + force, + }); + return reply.send(result); + } catch (err) { + return passOapErrorAudit(err, reply, deps, req, 'addOrUpdate', verb, catalog, q.name); + } + }); + + app.post( + '/api/rule/inactivate', + { preHandler: auth }, + async (req: FastifyRequest, reply: FastifyReply) => { + const q = req.query as Record<string, string | undefined>; + const catalog = parseRequiredCatalog(q, reply); + if (!catalog) return; + if (!q.name) return reply.code(400).send({ error: 'missing_name' }); + if (!ensureVerb(req, reply, deps, 'rule:write')) return; + try { + const result = await clients().primary().inactivate(catalog, q.name); + auditMutation(deps, req, 'inactivate', 'rule:write', catalog, q.name, result.applyStatus); + return reply.send(result); + } catch (err) { + return passOapErrorAudit( + err, + reply, + deps, + req, + 'inactivate', + 'rule:write', + catalog, + q.name, + ); + } + }, + ); + + app.post( + '/api/rule/delete', + { preHandler: auth }, + async (req: FastifyRequest, reply: FastifyReply) => { + const q = req.query as Record<string, string | undefined>; + const catalog = parseRequiredCatalog(q, reply); + if (!catalog) return; + if (!q.name) return reply.code(400).send({ error: 'missing_name' }); + const mode = parseDeleteMode(q.mode, reply); + if (mode === null) return; + /* `mode=revertToBundled` is a structural change — it swaps the + * active row's identity back to the bundled twin, the same + * write-class that `rule:write:structural` already gates on + * the addOrUpdate path. A caller with only `rule:delete` + * must not be able to revert. */ + const verb = mode === 'revertToBundled' ? 'rule:write:structural' : 'rule:delete'; + if (!ensureVerb(req, reply, deps, verb)) return; + try { + const result = await clients().primary().delete(catalog, q.name, mode); + auditMutation(deps, req, 'delete', verb, catalog, q.name, result.applyStatus, { + mode, + }); + return reply.send(result); + } catch (err) { + return passOapErrorAudit(err, reply, deps, req, 'delete', verb, catalog, q.name, { + mode, + }); + } + }, + ); + + // ── cluster state (BFF fan-out) ─────────────────────────────────── + + app.get( + '/api/cluster/state', + { preHandler: auth }, + async (req: FastifyRequest, reply: FastifyReply) => { + if (!ensureVerb(req, reply, deps, 'cluster:read')) return; + const c = clients(); + const perNode = await fetchPerNode(c); + return reply.send(pivotClusterState(perNode)); + }, + ); + + // ── dump (streaming passthrough) ────────────────────────────────── + + const dumpHandler = (catalog: Catalog | null) => + async function (req: FastifyRequest, reply: FastifyReply) { + if (!ensureVerb(req, reply, deps, 'rule:read')) return; + try { + const upstream = await clients() + .primary() + .dump(catalog ?? undefined); + const ct = upstream.headers.get('content-type') ?? 'application/octet-stream'; + const cd = upstream.headers.get('content-disposition'); + reply.header('content-type', ct); + if (cd) reply.header('content-disposition', cd); + return reply.send(upstream.body ?? ''); + } catch (err) { + return passOapError(err, reply); + } + }; + + app.get('/api/dump', { preHandler: auth }, dumpHandler(null)); + + app.get( + '/api/dump/:catalog', + { preHandler: auth }, + async (req: FastifyRequest, reply: FastifyReply) => { + const params = req.params as { catalog: string }; + if (!isCatalog(params.catalog)) { + return reply.code(400).send({ error: 'invalid_catalog', value: params.catalog }); + } + return dumpHandler(params.catalog)(req, reply); + }, + ); + + // ── OAL read-only browse (SWIP-13 §4.1) ────────────────────────── + // Read-only — no audit. `rule:read` gate. + // + // Wire shape from RuntimeOalRestHandler.java: + // /files — { files: string[], count } + // /files/{name} — text/plain raw .oal content + // /rules — per-dispatcher listing { sources, count } + // /rules/{source} — single source detail with `status: live | no_holder` + + app.get( + '/api/oal/files', + { preHandler: auth }, + async (req: FastifyRequest, reply: FastifyReply) => { + if (!ensureVerb(req, reply, deps, 'rule:read')) return; + try { + const list = await clients().oal().listFiles(); + return reply.send(list); + } catch (err) { + return passOapError(err, reply); + } + }, + ); + + app.get( + '/api/oal/files/:name', + { preHandler: auth }, + async (req: FastifyRequest, reply: FastifyReply) => { + if (!ensureVerb(req, reply, deps, 'rule:read')) return; + const params = req.params as { name: string }; + if (!params.name) return reply.code(400).send({ error: 'missing_name' }); + try { + const content = await clients().oal().getFileContent(params.name); + if (content === null) return reply.code(404).send({ error: 'not_found' }); + reply.header('content-type', 'text/plain; charset=utf-8'); + return reply.send(content); + } catch (err) { + return passOapError(err, reply); + } + }, + ); + + app.get( + '/api/oal/rules', + { preHandler: auth }, + async (req: FastifyRequest, reply: FastifyReply) => { + if (!ensureVerb(req, reply, deps, 'rule:read')) return; + try { + const sources = await clients().oal().listSources(); + return reply.send(sources); + } catch (err) { + return passOapError(err, reply); + } + }, + ); + + app.get( + '/api/oal/rules/:source', + { preHandler: auth }, + async (req: FastifyRequest, reply: FastifyReply) => { + if (!ensureVerb(req, reply, deps, 'rule:read')) return; + const params = req.params as { source: string }; + if (!params.source) return reply.code(400).send({ error: 'missing_source' }); + try { + const detail = await clients().oal().getSource(params.source); + if (detail === null) return reply.code(404).send({ error: 'not_found' }); + return reply.send(detail); + } catch (err) { + return passOapError(err, reply); + } + }, + ); +} + +// ── helpers ───────────────────────────────────────────────────────── + +function parseBoolean(v: string | undefined, fallback: boolean): boolean { + if (v === undefined) return fallback; + return TRUTHY.has(v.toLowerCase()); +} + +function hasCatalogParam(q: unknown): boolean { + return typeof q === 'object' && q !== null && 'catalog' in q; +} + +/** When the `catalog` query is missing, returns `undefined` and lets + * the caller proceed. When present-but-invalid, sends 400 and returns + * `undefined`; the caller should check `hasCatalogParam` to disambiguate. */ +function parseOptionalCatalog(q: unknown, reply: FastifyReply): Catalog | undefined { + const raw = (q as Record<string, string | undefined>).catalog; + if (raw === undefined || raw === '') return undefined; + if (!isCatalog(raw)) { + reply.code(400).send({ error: 'invalid_catalog', value: raw }); + return undefined; + } + return raw; +} + +function parseRequiredCatalog(q: unknown, reply: FastifyReply): Catalog | null { + const raw = (q as Record<string, string | undefined>).catalog; + if (!raw) { + reply.code(400).send({ error: 'missing_catalog' }); + return null; + } + if (!isCatalog(raw)) { + reply.code(400).send({ error: 'invalid_catalog', value: raw }); + return null; + } + return raw; +} + +/** Returns the parsed `DeleteMode` or `null` when the wire value was + * invalid (and a 400 was sent). */ +function parseDeleteMode(raw: string | undefined, reply: FastifyReply): DeleteMode | null { + if (raw === undefined || raw === '') return ''; + if (raw === 'revertToBundled') return 'revertToBundled'; + reply.code(400).send({ error: 'invalid_delete_mode', value: raw }); + return null; +} + +function ensureVerb( + req: FastifyRequest, + reply: FastifyReply, + deps: OapRouteDeps, + verb: string, +): boolean { + const session: Session | undefined = req.session; + if (!session) { + reply.code(401).send({ error: 'unauthenticated' }); + return false; + } + if (!sessionHasVerb(deps.config.current, session.roles, verb)) { + reply.code(403).send({ error: 'permission_denied', verb }); + return false; + } + return true; +} + +function passOapError(err: unknown, reply: FastifyReply): FastifyReply { + if (err instanceof RuntimeRuleApiError) { + return reply.code(err.status).send(err.body); + } + return reply.code(502).send({ + error: 'oap_unreachable', + message: err instanceof Error ? err.message : String(err), + }); +} + +function passOapErrorAudit( + err: unknown, + reply: FastifyReply, + deps: OapRouteDeps, + req: FastifyRequest, + action: string, + verb: string, + catalog: Catalog, + name: string, + details: Record<string, unknown> = {}, +): FastifyReply { + let outcome = 'oap_unreachable'; + if (err instanceof RuntimeRuleApiError) { + const apiErr: RuntimeRuleApiError = err; + const body = apiErr.body; + outcome = + typeof body === 'object' && body !== null && 'applyStatus' in body + ? body.applyStatus + : `http_${apiErr.status}`; + } + deps.audit.record({ + action, + verb, + actor: req.session?.username ?? null, + outcome, + details: { catalog, name, ...details }, + fromIp: req.ip, + sessionId: req.session?.sid, + }); + return passOapError(err, reply); +} + +function auditMutation( + deps: OapRouteDeps, + req: FastifyRequest, + action: string, + verb: string, + catalog: Catalog, + name: string, + outcome: string, + details: Record<string, unknown> = {}, +): void { + deps.audit.record({ + action, + verb, + actor: req.session?.username ?? null, + outcome, + details: { catalog, name, ...details }, + fromIp: req.ip, + sessionId: req.session?.sid, + }); +} diff --git a/apps/bff/src/oap/server-time.ts b/apps/bff/src/oap/server-time.ts new file mode 100644 index 0000000..ab49e37 --- /dev/null +++ b/apps/bff/src/oap/server-time.ts @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Server-time discovery — proxy for OAP's GraphQL `getTimeInfo` + * query. Returns the OAP server's UTC offset (in minutes) and current + * timestamp. Studio's SPA caches this once and uses it to: + * + * - Display dates in the browser's local timezone (operator-facing). + * - Convert browser-local dates to server-timezone strings when + * firing MQE (`yyyy-MM-dd HHmm` etc. — OAP parses these in its + * own TZ, not UTC). + * + * The booster UI uses the same path (`graphql/fragments/app.ts:17`). + * The wire shape returns `timezone` as an integer in the +/- HHMM + * shape — e.g. `800` for UTC+8, `-500` for UTC-5, `530` for UTC+5:30. + * We translate to plain minutes so the SPA doesn't have to repeat the + * parsing. + */ + +import type { FetchLike } from '@skywalking-horizon-ui/api-client'; +import type { HorizonConfig } from '../config/schema.js'; +import type { MqeTargetCache } from './mqe-target.js'; + +export interface ServerTime { + /** OAP server's UTC offset in minutes. `+480` for UTC+8, `-300` + * for UTC-5, `+330` for India (UTC+5:30). */ + offsetMinutes: number; + /** OAP-side current epoch millis (snapshot at fetch time). */ + currentTimestampMillis: number; + /** Where the offset came from — `oap` is the real graphql call; + * `fallback` is the local BFF clock returned when OAP's + * `getTimeInfo` is unreachable. */ + source: 'oap' | 'fallback'; + /** Resolved MQE base URL the BFF queried. Diagnostic. */ + mqeBaseUrl?: string; + /** Short error message when source === 'fallback'. */ + error?: string; +} + +const GRAPHQL_QUERY = + 'query ServerTime {\n getTimeInfo {\n timezone\n currentTimestamp\n }\n}\n'; + +interface CacheEntry { + value: ServerTime; + expiresAt: number; +} + +const CACHE_TTL_MS = 5 * 60_000; + +export class ServerTimeCache { + private entry: CacheEntry | null = null; + + invalidate(): void { + this.entry = null; + } + + async get(deps: ServerTimeDeps): Promise<ServerTime> { + const now = Date.now(); + if (this.entry && this.entry.expiresAt > now) return this.entry.value; + const value = await resolveServerTime(deps); + /* Don't cache fallbacks for the full TTL — recover faster when + * OAP comes back. */ + const ttl = value.source === 'oap' ? CACHE_TTL_MS : 15_000; + this.entry = { value, expiresAt: now + ttl }; + return value; + } +} + +export interface ServerTimeDeps { + config(): HorizonConfig; + fetch: FetchLike; + mqeTarget: MqeTargetCache; +} + +async function resolveServerTime(deps: ServerTimeDeps): Promise<ServerTime> { + let mqeBaseUrl: string | undefined; + /* AbortController gives every server-time call the same upper bound + * the other OAP-bound BFF calls already respect (`oap.timeoutMs`). + * Without it a hung /graphql leaks the request indefinitely. */ + const timeoutMs = deps.config().oap.timeoutMs; + const ctrl = timeoutMs > 0 ? new AbortController() : null; + const timer = ctrl ? setTimeout(() => ctrl.abort(), timeoutMs) : null; + try { + const target = await deps.mqeTarget.resolve({ + config: () => deps.config(), + fetch: deps.fetch, + }); + mqeBaseUrl = target.baseUrl; + const init: RequestInit = { + method: 'POST', + headers: { 'Content-Type': 'application/json', Accept: 'application/json' }, + body: JSON.stringify({ query: GRAPHQL_QUERY }), + ...(ctrl ? { signal: ctrl.signal } : {}), + }; + const res = await deps.fetch(`${target.baseUrl.replace(/\/$/, '')}/graphql`, init); + if (!res.ok) { + const txt = (await res.text()).slice(0, 200); + return fallback(`HTTP ${res.status}: ${txt}`, mqeBaseUrl); + } + const env = (await res.json()) as { + data?: { getTimeInfo?: { timezone?: number | string; currentTimestamp?: number } }; + errors?: { message: string }[]; + }; + if (env.errors && env.errors.length > 0) { + return fallback(env.errors.map((e) => e.message).join('; '), mqeBaseUrl); + } + const info = env.data?.getTimeInfo; + if ( + !info || + info.timezone === undefined || + info.timezone === null || + typeof info.currentTimestamp !== 'number' + ) { + return fallback('getTimeInfo missing timezone / currentTimestamp', mqeBaseUrl); + } + const offsetMinutes = parseTimezone(info.timezone); + if (offsetMinutes === null) { + return fallback( + `getTimeInfo timezone is not parseable: ${JSON.stringify(info.timezone)}`, + mqeBaseUrl, + ); + } + return { + offsetMinutes, + currentTimestampMillis: info.currentTimestamp, + source: 'oap', + mqeBaseUrl, + }; + } catch (err) { + return fallback(err instanceof Error ? err.message : String(err), mqeBaseUrl); + } finally { + if (timer) clearTimeout(timer); + } +} + +function fallback(error: string, mqeBaseUrl?: string): ServerTime { + return { + offsetMinutes: -new Date().getTimezoneOffset(), + currentTimestampMillis: Date.now(), + source: 'fallback', + error, + ...(mqeBaseUrl !== undefined ? { mqeBaseUrl } : {}), + }; +} + +/** OAP's `timezone` field is typed as String on the GraphQL schema + * and arrives in `+HHMM` / `-HHMM` form (e.g. `"+0000"`, `"-0500"`, + * `"+0530"`). Some older OAP builds emit a bare integer; both + * shapes are accepted here. Returns `null` when unparseable. */ +export function parseTimezone(tz: string | number): number | null { + if (typeof tz === 'number' && Number.isFinite(tz)) return hhmmIntegerToMinutes(tz); + if (typeof tz !== 'string') return null; + const trimmed = tz.trim(); + /* Accept "+HHMM", "-HHMM", "HHMM", and the colon variants + * "+HH:MM" / "HH:MM" for forward compatibility. */ + const m = /^([+-]?)(\d{1,2}):?(\d{2})$/.exec(trimmed); + if (!m) return null; + const sign = m[1] === '-' ? -1 : 1; + const hours = Number(m[2]); + const mins = Number(m[3]); + if (mins >= 60) return null; + return sign * (hours * 60 + mins); +} + +/** Legacy path: integer `+/- HHMM`. */ +export function hhmmIntegerToMinutes(tz: number): number { + const sign = tz < 0 ? -1 : 1; + const abs = Math.abs(tz); + const hours = Math.trunc(abs / 100); + const mins = abs % 100; + return sign * (hours * 60 + mins); +} diff --git a/apps/bff/src/rbac/policy.ts b/apps/bff/src/rbac/policy.ts index d016173..86c5f5d 100644 --- a/apps/bff/src/rbac/policy.ts +++ b/apps/bff/src/rbac/policy.ts @@ -20,8 +20,18 @@ import { forbidden, unauthorized } from '../errors.js'; import type { Session } from '../auth/sessions.js'; import type { SessionStore } from '../auth/sessions.js'; import type { ConfigSource } from '../config/loader.js'; +import type { HorizonConfig } from '../config/schema.js'; import { hasVerb, resolveVerbsForRoles, type Verb } from './verbs.js'; +export function sessionHasVerb( + config: HorizonConfig, + roles: readonly string[], + required: string, +): boolean { + const verbs = resolveVerbsForRoles(config.rbac.roles, roles, config.rbac.enabled); + return hasVerb(verbs, required); +} + // Augment Fastify's request type so handlers can `req.session` without casts. declare module 'fastify' { interface FastifyRequest { diff --git a/apps/bff/src/server.ts b/apps/bff/src/server.ts index f302175..6acae05 100644 --- a/apps/bff/src/server.ts +++ b/apps/bff/src/server.ts @@ -21,6 +21,8 @@ import { AuditLogger } from './audit/logger.js'; import { registerAuthRoutes } from './auth/routes.js'; import { SessionStore } from './auth/sessions.js'; import { loadConfig, type ConfigSource } from './config/loader.js'; +import { registerOapRoutes } from './oap/routes.js'; +import { registerPreflightRoutes } from './oap/preflight-routes.js'; import { HttpError } from './errors.js'; import { logger, loggerOptions } from './logger.js'; @@ -47,7 +49,12 @@ await audit.open(); await app.register(cookie); +// Text/plain body parser — the rule editor sends raw YAML to /api/rule. +app.addContentTypeParser('text/plain', { parseAs: 'string' }, (_req, body, done) => done(null, body)); + registerAuthRoutes(app, source, sessions, audit); +registerOapRoutes(app, { config: source, sessions, audit }); +registerPreflightRoutes(app, { config: source, sessions }); app.get('/api/health', async () => ({ status: 'ok',
