wangzhigang1999 commented on code in PR #7385: URL: https://github.com/apache/kyuubi/pull/7385#discussion_r3063145680
########## externals/kyuubi-data-agent-engine/src/main/scala/org/apache/kyuubi/engine/dataagent/operation/ApproveToolCall.scala: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.dataagent.operation + +import com.fasterxml.jackson.databind.ObjectMapper + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.engine.dataagent.provider.DataAgentProvider +import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState} +import org.apache.kyuubi.session.Session + +/** + * A lightweight synchronous operation that resolves a pending tool approval request. + * + * The client sends a statement with the format `__approve:<requestId>` or `__deny:<requestId>`. + * This operation parses the command, calls the provider's `resolveApproval`, and returns + * a single-row result indicating whether the resolution succeeded. + */ +class ApproveToolCall( + session: Session, + override val statement: String, + dataAgentProvider: DataAgentProvider) + extends DataAgentOperation(session) with Logging { + + override val shouldRunAsync: Boolean = false + + override protected def runInternal(): Unit = { + setState(OperationState.RUNNING) + + try { + val trimmed = statement.trim + val (requestId, approved) = if (trimmed.startsWith(ApproveToolCall.APPROVE_PREFIX)) { + (trimmed.substring(ApproveToolCall.APPROVE_PREFIX.length).trim, true) + } else if (trimmed.startsWith(ApproveToolCall.DENY_PREFIX)) { + (trimmed.substring(ApproveToolCall.DENY_PREFIX.length).trim, false) + } else { + throw new IllegalArgumentException(s"Invalid approval command: $trimmed") + } + if (requestId.isEmpty) { + throw new IllegalArgumentException("requestId cannot be empty") + } + + val resolved = dataAgentProvider.resolveApproval(requestId, approved) + val action = if (approved) "approved" else "denied" + val node = ApproveToolCall.JSON.createObjectNode() + node.put("status", if (resolved) "ok" else "not_found") + node.put("action", action) + node.put("requestId", requestId) + val result = ApproveToolCall.JSON.writeValueAsString(node) + + iter = new ArrayFetchIterator[Array[String]](Array(Array(result))) + setState(OperationState.FINISHED) + } catch { + onError() + } + } +} + +object ApproveToolCall { + private val JSON = new ObjectMapper() + val APPROVE_PREFIX = "__approve:" + val DENY_PREFIX = "__deny:" + + def isApprovalCommand(statement: String): Boolean = { + val trimmed = statement.trim + trimmed.startsWith(APPROVE_PREFIX) || trimmed.startsWith(DENY_PREFIX) Review Comment: Sure. The goal is to let a human approve or deny risky tool calls (for example a destructive SQL statement) without adding any new RPC — everything stays on the normal `executeStatement` / `fetchResults` path, so plain JDBC and Thrift clients work as-is. Here's the full round trip: ```mermaid sequenceDiagram autonumber participant C as Client participant OM as DataAgentOperationManager participant ES as ExecuteStatement<br/>(async) participant P as Provider<br/>(agent + approval) participant AT as ApproveToolCall<br/>(sync) C->>OM: executeStatement("what is ...") OM->>ES: new ExecuteStatement ES->>P: run(request, onEvent) P-->>ES: content_delta events ES-->>C: fetchResults (JSON rows) Note over P: agent hits a risky tool call<br/>put(requestId, future) P-->>ES: approval_request(requestId) ES-->>C: fetchResults (approval row) Note over P: agent thread blocks on<br/>future.get(timeout) C->>OM: executeStatement("__approve:<requestId>") OM->>AT: isApprovalCommand ⇒ route to ApproveToolCall AT->>P: resolveApproval(requestId, true) P-->>P: future.complete(true) AT-->>C: {status:"ok", action, requestId} Note over P: agent thread resumes P-->>ES: tool_result / content_delta / finished ES-->>C: fetchResults (remaining rows) ``` A couple of choices worth calling out: `ApproveToolCall` is synchronous (`shouldRunAsync = false`) because it's just a control-plane ping — there's nothing to stream, so async would only add an extra state transition for no benefit. Staying on the `executeStatement` path means we don't touch the Thrift IDL and every existing client keeps working. The `__approve:` / `__deny:` prefixes are an easy sentinel since natural-language questions don't start with `__`. The pending-request map lives inside the provider, not in the operation manager, because the agent runtime is what actually blocks on the tool call — the future belongs next to the thing that's waiting on it. The operation layer just forwards `resolveApproval` and doesn't need to know about tool-call semantics or timeouts, which keeps the `DataAgentOperation*` classes free of agent-specific state. Scope for this PR: what lands here is the SPI (`DataAgentProvider.resolveApproval`), the routing, and `ApproveToolCall` itself. The actual parking mechanism inside a real provider comes with the LLM-backed provider in the follow-up PR. Right now the default `resolveApproval` returns `false`, so an unknown or expired `requestId` surfaces as `{"status":"not_found"}` to the client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
