[ https://issues.apache.org/jira/browse/BEAM-1177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15758852#comment-15758852 ]
Amit Sela commented on BEAM-1177: --------------------------------- Instead of simply emitting {{Iterable<byte[]>}} per partition, I'll emit {{Tuple2<Iterable<byte[]>, Metadata>}} to be able to report read count and watermark per batch. {code} class Metadata { private final long numRecords; private final Instant watermark; public Metadata(long numRecords, Instant watermark) { this.numRecords = numRecords; this.watermark = watermark; } public long getNumRecords() { return numRecords; } public Instant getWatermark() { return watermark; } } {code} > Input DStream "bundles" should be in serialized form and include relevant > metadata. > ----------------------------------------------------------------------------------- > > Key: BEAM-1177 > URL: https://issues.apache.org/jira/browse/BEAM-1177 > Project: Beam > Issue Type: Bug > Components: runner-spark > Reporter: Amit Sela > Assignee: Amit Sela > > Currently, the input partitions hold "bundles" of read elements within the > {{mapWithStateDStream}} used for the read. > Since this is automatically shuffled, user-data (the read elements) should be > serialized using coders to avoid breaking (if user-data is not {{Kryo}} > serializable). > Even after BEAM-848 would complete, the resulting {{MapWithStateDStream}} > would be checkpointed periodically and so it would still have to remain in > serialized form. -- This message was sent by Atlassian JIRA (v6.3.4#6332)