[ https://issues.apache.org/jira/browse/BEAM-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384330#comment-16384330 ]
Raghu Angadi commented on BEAM-3754: ------------------------------------ Just saw this. You are correct. Thanks for reporting it. They should never have duplicated code. I am not even sure when `{{rawBytes()}}` was added. Sent a fix in https://github.com/apache/beam/pull/4792 > KAFKA - Can't set commitOffsetsInFinalizeEnabled to false with > KafkaIO.readBytes() > ---------------------------------------------------------------------------------- > > Key: BEAM-3754 > URL: https://issues.apache.org/jira/browse/BEAM-3754 > Project: Beam > Issue Type: Bug > Components: io-java-kafka > Affects Versions: 2.3.0 > Environment: Dataflow pipeline using Kafka as a Sink > Reporter: Benjamin BENOIST > Assignee: Raghu Angadi > Priority: Minor > Labels: patch > Original Estimate: 2h > Time Spent: 10m > Remaining Estimate: 1h 50m > > Beam v2.3 introduces finalized offsets, in order to reduce the gaps or > duplicate processing of records while restarting a pipeline. > _read()_ sets this parameter to false [by > default|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L307] > but _readBytes()_ > [doesn't|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L282], > thus creating an exception: > {noformat} > Exception in thread "main" java.lang.IllegalStateException: Missing required > properties: commitOffsetsInFinalizeEnabled > at > org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read$Builder.build(AutoValue_KafkaIO_Read.java:344) > at > org.apache.beam.sdk.io.kafka.KafkaIO.readBytes(KafkaIO.java:291){noformat} > The parameter can be set to true with _commitOffsetsInFinalize()_ but never > to false. > Using _read()_ in the definition of _readBytes()_ could prevent this kind of > error in the future: > {code:java} > public static Read<byte[], byte[]> readBytes() { > return read() > .setKeyDeserializer(ByteArrayDeserializer.class) > .setValueDeserializer(ByteArrayDeserializer.class) > .build(); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)