aglinxinyuan opened a new pull request, #5451: URL: https://github.com/apache/texera/pull/5451
Closes #5450. ## Summary Pin behavior of `InputManager` — the per-worker input broker that was the only previously-uncovered module in the `messaginglayer` package (every sibling — `WorkerPort`, `OutputManager`, `AmberFIFOChannel`, `CongestionControl`, `FlowControl`, `NetworkInputGateway`, `OrderingEnforcer`, `RangeBasedShuffle` — already has a spec). | Spec | Source class | Tests | | --- | --- | --- | | `InputManagerSpec` | `InputManager` | 15 | Spec file name follows the `<srcClassName>Spec.scala` one-to-one convention. ## Behavior pinned | Surface | Contract | | --- | --- | | `getAllPorts` (fresh) | empty `Set` | | `addPort(portId, schema, Nil, Nil)` | registers a fresh `WorkerPort` with the supplied schema; no reader threads attached | | second `addPort` for the same `portId` | silently no-ops (first call wins; the prior `WorkerPort` survives, including any mutation to `completed`) | | `addPort` with mismatched URI / partitioning sizes | throws `AssertionError` | | `getPort(portId)` | is a pure lookup — successive calls return the same `WorkerPort` instance | | `getInputPortReaderThreads` | empty `Map` when every port was added with `urisToRead = List.empty` | | `hasUnfinishedInput` (fresh) | `false` (no batch initialized) | | `getCurrentTuple` (fresh) | `null` | | `getCurrentTuple` (empty batch) | `null` | | `hasUnfinishedInput` (empty batch) | `false` | | `initBatch(channelId, batch)` | replaces `inputBatch`, resets the cursor, surfaces the channel id via `currentChannelId` | | `getNextTuple` over a batch | yields each element in order; `getCurrentTuple` tracks the last yielded value; `hasUnfinishedInput` flips to `false` once the last element is consumed | | `initBatch` called a second time | replaces the prior batch entirely and restarts the cursor (proven by yielding the new batch's first element fresh) | | `isPortCompleted` (non-materialized) | reads through to `WorkerPort.completed` — `false` initially, `true` once flipped | | `startInputPortReaderThreads` (no threads) | safe no-op | ## Notes Materialization-reader-thread paths (non-empty `urisToRead`) are deliberately left out of this characterization. Constructing an `InputPortMaterializationReaderThread` eagerly evaluates `toPartitioner(partitioning, workerActorId)` and pulls in document-factory plumbing that is out of scope for a unit spec — those code paths are exercised in higher-level worker / replay tests. ## Test plan - [x] `WorkflowExecutionService/testOnly org.apache.texera.amber.engine.architecture.messaginglayer.InputManagerSpec` — 15 tests, all green - [x] `scalafmtCheckAll` — clean - [ ] CI to confirm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
