aglinxinyuan opened a new issue, #5450:
URL: https://github.com/apache/texera/issues/5450

   ## Background
   
   `InputManager` in `engine/architecture/messaginglayer` is the only module in 
that package without a dedicated spec — every sibling (`WorkerPort`, 
`OutputManager`, `AmberFIFOChannel`, `CongestionControl`, `FlowControl`, 
`NetworkInputGateway`, `OrderingEnforcer`, `RangeBasedShuffle`) has one.
   
   `InputManager` brokers per-port input state on the worker side: it tracks 
`WorkerPort` instances per `PortIdentity`, exposes the current input batch via 
`hasUnfinishedInput` / `getNextTuple` / `getCurrentTuple`, and decides port 
completion either from the port's own `completed` flag or from an attached 
`InputPortMaterializationReaderThread`.
   
   ## Behavior to pin
   
   | Surface | Contract |
   | --- | --- |
   | `getAllPorts` (fresh) | empty `Set` |
   | `addPort(portId, schema, Nil, Nil)` | registers a fresh `WorkerPort` with 
the supplied schema; no reader threads attached |
   | `addPort` called twice for the same `portId` | silently no-ops (first call 
wins; the second is ignored) |
   | `addPort` with mismatched `urisToRead.size != partitionings.size` | throws 
`AssertionError` |
   | `getPort(portId)` for a registered port | returns the `WorkerPort` 
previously inserted by `addPort` |
   | `getInputPortReaderThreads` | empty `Map` when no port was added with URIs 
|
   | `getCurrentTuple` (no batch initialized) | `null` |
   | `getCurrentTuple` (empty batch initialized) | `null` |
   | `getCurrentTuple` (after `getNextTuple` advanced) | the most-recently 
returned tuple |
   | `hasUnfinishedInput` (no batch initialized) | `false` |
   | `hasUnfinishedInput` (mid-batch) | `true` until the iterator advances past 
the last index |
   | `initBatch(channelId, batch)` | resets `currentChannelId`, replaces 
`inputBatch`, restarts the index |
   | `getNextTuple` over a batch | yields each tuple in order; mutates the 
cursor so `getCurrentTuple` follows |
   | `isPortCompleted` (non-materialized) | returns the `WorkerPort.completed` 
flag |
   | `isPortCompleted` (a materialized port whose reader thread has not 
finished) | `false` |
   
   ## Scope
   
   - New spec file: `InputManagerSpec.scala` (matches the 
`<srcClassName>Spec.scala` convention).
   - No production-code changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to