aglinxinyuan opened a new issue, #5450: URL: https://github.com/apache/texera/issues/5450
## Background `InputManager` in `engine/architecture/messaginglayer` is the only module in that package without a dedicated spec — every sibling (`WorkerPort`, `OutputManager`, `AmberFIFOChannel`, `CongestionControl`, `FlowControl`, `NetworkInputGateway`, `OrderingEnforcer`, `RangeBasedShuffle`) has one. `InputManager` brokers per-port input state on the worker side: it tracks `WorkerPort` instances per `PortIdentity`, exposes the current input batch via `hasUnfinishedInput` / `getNextTuple` / `getCurrentTuple`, and decides port completion either from the port's own `completed` flag or from an attached `InputPortMaterializationReaderThread`. ## Behavior to pin | Surface | Contract | | --- | --- | | `getAllPorts` (fresh) | empty `Set` | | `addPort(portId, schema, Nil, Nil)` | registers a fresh `WorkerPort` with the supplied schema; no reader threads attached | | `addPort` called twice for the same `portId` | silently no-ops (first call wins; the second is ignored) | | `addPort` with mismatched `urisToRead.size != partitionings.size` | throws `AssertionError` | | `getPort(portId)` for a registered port | returns the `WorkerPort` previously inserted by `addPort` | | `getInputPortReaderThreads` | empty `Map` when no port was added with URIs | | `getCurrentTuple` (no batch initialized) | `null` | | `getCurrentTuple` (empty batch initialized) | `null` | | `getCurrentTuple` (after `getNextTuple` advanced) | the most-recently returned tuple | | `hasUnfinishedInput` (no batch initialized) | `false` | | `hasUnfinishedInput` (mid-batch) | `true` until the iterator advances past the last index | | `initBatch(channelId, batch)` | resets `currentChannelId`, replaces `inputBatch`, restarts the index | | `getNextTuple` over a batch | yields each tuple in order; mutates the cursor so `getCurrentTuple` follows | | `isPortCompleted` (non-materialized) | returns the `WorkerPort.completed` flag | | `isPortCompleted` (a materialized port whose reader thread has not finished) | `false` | ## Scope - New spec file: `InputManagerSpec.scala` (matches the `<srcClassName>Spec.scala` convention). - No production-code changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
