40u5 opened a new pull request, #182:
URL: https://github.com/apache/spark-connect-go/pull/182
### What changes were proposed in this pull request?
Fixes #126.
- Adds `Interrupt(ctx, interruptType, operationIdOrTag)` to the
`SparkConnectClient` interface (`spark/client/base/base.go`) and implements it
on `sparkConnectClientImpl` (`spark/client/client.go`), wrapping the
proto-level `Interrupt` RPC and branching on `INTERRUPT_TYPE_ALL` / `TAG` /
`OPERATION_ID`.
- `ExecutePlanClient` now remembers the caller's `context.Context`, the
operation id, and a back-reference to the client. `ToTable` arms a watcher
goroutine that, on `callerCtx.Done()`, sends `InterruptRequest{OPERATION_ID,
operationId}` using a detached 10 s context so the cancellation actually
reaches the server.
- Exposes `InterruptAll`, `InterruptTag`, and `InterruptOperation` on
`SparkSession` (`spark/sql/sparksession.go`) so users can also kill operations
explicitly, mirroring PySpark.
- Updates `spark/mocks/mock_executor.go` to satisfy the extended interface.
### Why are the changes needed?
As reported in #126, `dataframe.Collect(ctx)` previously dropped the gRPC
stream locally when the caller cancelled `ctx`, but the Spark executor kept
running the query until the server's idle timeout (5 min). This addresses all
three follow-ups from @grundprinzip's reply on the issue:
1. Surfaces `InterruptTag` on `SparkSession`.
2. Surfaces `InterruptOperation` on `SparkSession`.
3. Wires up automatic interrupt on client-side context cancellation.
### Does this PR introduce _any_ user-facing change?
Yes — additive only:
- `SparkSession` gains `InterruptAll(ctx)`, `InterruptTag(ctx, tag)`, and
`InterruptOperation(ctx, operationId)`, each returning the server-reported list
of interrupted operation ids.
- Cancelling the `context.Context` passed to `Collect` / `ExecutePlan` /
`ExecuteCommand` now causes the server-side operation to be interrupted instead
of running to idle timeout.
No backward-incompatible breakages for users — the `SparkConnectClient`
interface gains a method, but it has no known external implementers; the
in-tree `mocks.TestExecutor` is updated in the same commit.
### How was this patch tested?
Added three unit tests in `spark/client/client_test.go`:
- `TestExecutePlanCancellingContextSendsInterrupt` — uses a `Recv`-blocking
stream and verifies that cancelling `ctx` triggers an `InterruptRequest` with
`INTERRUPT_TYPE_OPERATION_ID` and the matching operation id within 2 s.
- `TestInterruptAllCallsClient` — verifies `Interrupt(ALL, "")` plumbs
through.
- `TestInterruptOperationCallsClient` — verifies `Interrupt(OPERATION_ID,
opID)` plumbs through.
`go build ./...`, `go vet ./...`, `gofmt -l ./spark` (no diff), and `go test
./spark/...` (all packages PASS) run cleanly. The only failure under `go test
./...` is the existing `internal/tests/integration` suite, which requires
`SPARK_HOME` to be set and is unrelated to this change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]