aglinxinyuan opened a new pull request, #5451:
URL: https://github.com/apache/texera/pull/5451

   Closes #5450.
   
   ## Summary
   
   Pin behavior of `InputManager` — the per-worker input broker that was the 
only previously-uncovered module in the `messaginglayer` package (every sibling 
— `WorkerPort`, `OutputManager`, `AmberFIFOChannel`, `CongestionControl`, 
`FlowControl`, `NetworkInputGateway`, `OrderingEnforcer`, `RangeBasedShuffle` — 
already has a spec).
   
   | Spec | Source class | Tests |
   | --- | --- | --- |
   | `InputManagerSpec` | `InputManager` | 15 |
   
   Spec file name follows the `<srcClassName>Spec.scala` one-to-one convention.
   
   ## Behavior pinned
   
   | Surface | Contract |
   | --- | --- |
   | `getAllPorts` (fresh) | empty `Set` |
   | `addPort(portId, schema, Nil, Nil)` | registers a fresh `WorkerPort` with 
the supplied schema; no reader threads attached |
   | second `addPort` for the same `portId` | silently no-ops (first call wins; 
the prior `WorkerPort` survives, including any mutation to `completed`) |
   | `addPort` with mismatched URI / partitioning sizes | throws 
`AssertionError` |
   | `getPort(portId)` | is a pure lookup — successive calls return the same 
`WorkerPort` instance |
   | `getInputPortReaderThreads` | empty `Map` when every port was added with 
`urisToRead = List.empty` |
   | `hasUnfinishedInput` (fresh) | `false` (no batch initialized) |
   | `getCurrentTuple` (fresh) | `null` |
   | `getCurrentTuple` (empty batch) | `null` |
   | `hasUnfinishedInput` (empty batch) | `false` |
   | `initBatch(channelId, batch)` | replaces `inputBatch`, resets the cursor, 
surfaces the channel id via `currentChannelId` |
   | `getNextTuple` over a batch | yields each element in order; 
`getCurrentTuple` tracks the last yielded value; `hasUnfinishedInput` flips to 
`false` once the last element is consumed |
   | `initBatch` called a second time | replaces the prior batch entirely and 
restarts the cursor (proven by yielding the new batch's first element fresh) |
   | `isPortCompleted` (non-materialized) | reads through to 
`WorkerPort.completed` — `false` initially, `true` once flipped |
   | `startInputPortReaderThreads` (no threads) | safe no-op |
   
   ## Notes
   
   Materialization-reader-thread paths (non-empty `urisToRead`) are 
deliberately left out of this characterization. Constructing an 
`InputPortMaterializationReaderThread` eagerly evaluates 
`toPartitioner(partitioning, workerActorId)` and pulls in document-factory 
plumbing that is out of scope for a unit spec — those code paths are exercised 
in higher-level worker / replay tests.
   
   ## Test plan
   
   - [x] `WorkflowExecutionService/testOnly 
org.apache.texera.amber.engine.architecture.messaginglayer.InputManagerSpec` — 
15 tests, all green
   - [x] `scalafmtCheckAll` — clean
   - [ ] CI to confirm


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to