[ 
https://issues.apache.org/jira/browse/BEAM-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benjamin BENOIST updated BEAM-3754:
-----------------------------------
    Summary: KAFKA - Can't have commitOffsetsInFinalizeEnabled set to false 
with KafkaIO.readBytes()  (was: Can't have commitOffsetsInFinalizeEnabled set 
to false with KafkaIO.readBytes())

> KAFKA - Can't have commitOffsetsInFinalizeEnabled set to false with 
> KafkaIO.readBytes()
> ---------------------------------------------------------------------------------------
>
>                 Key: BEAM-3754
>                 URL: https://issues.apache.org/jira/browse/BEAM-3754
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.3.0
>         Environment: Dataflow pipeline using Kafka as a Sink
>            Reporter: Benjamin BENOIST
>            Assignee: Raghu Angadi
>            Priority: Minor
>              Labels: patch
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Beam v2.3 introduces finalized offsets, in order to reduce the gaps or 
> duplicate processing of records while restarting a pipeline.
> _read()_ sets this parameter to false [by 
> default|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L307]
>  but _readBytes()_ 
> [doesn't|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L282],
>  thus creating an exception:
> {noformat}
> Exception in thread "main" java.lang.IllegalStateException: Missing required 
> properties: commitOffsetsInFinalizeEnabled
>          at 
> org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read$Builder.build(AutoValue_KafkaIO_Read.java:344)
>          at 
> org.apache.beam.sdk.io.kafka.KafkaIO.readBytes(KafkaIO.java:291){noformat}
> The parameter can be set to true with _commitOffsetsInFinalize()_ but never 
> to false.
> Using _read()_ in the definition of _readBytes()_ could prevent this kind of 
> error in the future:
> {code:java}
> public static Read<byte[], byte[]> readBytes() {
>   return read()
>     .setKeyDeserializer(ByteArrayDeserializer.class)
>     .setValueDeserializer(ByteArrayDeserializer.class)
>     .build();
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to