Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer
Hi Vikash, Sorry for the late reply. Is your version of Flink kafka *connector* 1.10.1 too? Actually it's a bug in the connector, so I think you need to upgrade the connector to 1.10.1 too, not just Flink itself. I tried Flink 1.10.0/1.10.1 + flink-kafka-connector 1.10.0 and indeed reproduced the bug. After upgrading flink-kafka-connector to 1.10.1, the error disappeared. On Fri, Jul 31, 2020 at 7:02 PM Vikash Dat wrote: > Thanks for the reply. I am currently using 1.10 but also saw it happens in > 1.10.1 when experimenting. I have not tried 1.11 since EMR only has up to > 1.10 at the moment. Are there any known work arounds? > > On Fri, Jul 31, 2020 at 02:42 Qingsheng Ren wrote: > >> Hi Vikash, >> >> It's a bug about classloader used in `abortTransaction()` method in >> `FlinkKafkaProducer`, Flink version 1.10.0. I think it has been fixed in >> 1.10.1 and 1.11 according to FLINK-16262. Are you using Flink version >> 1.10.0? >> >> >> Vikash Dat 于2020年7月30日周四 下午9:26写道: >> >>> Has anyone had success with using exactly_once in a kafka producer in >>> flink? >>> As of right now I don't think the code shown in the docs >>> ( >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer >>> ) >>> actually works. >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >> >> >> -- >> Best Regards, >> >> *Qingsheng Ren* >> >> Electrical and Computer Engineering >> Carnegie Mellon University >> >> Email: renqs...@gmail.com >> > -- Best Regards, *Qingsheng Ren* Electrical and Computer Engineering Carnegie Mellon University Email: renqs...@gmail.com
Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer
Thanks for the reply. I am currently using 1.10 but also saw it happens in 1.10.1 when experimenting. I have not tried 1.11 since EMR only has up to 1.10 at the moment. Are there any known work arounds? On Fri, Jul 31, 2020 at 02:42 Qingsheng Ren wrote: > Hi Vikash, > > It's a bug about classloader used in `abortTransaction()` method in > `FlinkKafkaProducer`, Flink version 1.10.0. I think it has been fixed in > 1.10.1 and 1.11 according to FLINK-16262. Are you using Flink version > 1.10.0? > > > Vikash Dat 于2020年7月30日周四 下午9:26写道: > >> Has anyone had success with using exactly_once in a kafka producer in >> flink? >> As of right now I don't think the code shown in the docs >> ( >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer >> ) >> actually works. >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> > > > -- > Best Regards, > > *Qingsheng Ren* > > Electrical and Computer Engineering > Carnegie Mellon University > > Email: renqs...@gmail.com >
Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer
Hi Vikash, It's a bug about classloader used in `abortTransaction()` method in `FlinkKafkaProducer`, Flink version 1.10.0. I think it has been fixed in 1.10.1 and 1.11 according to FLINK-16262. Are you using Flink version 1.10.0? Vikash Dat 于2020年7月30日周四 下午9:26写道: > Has anyone had success with using exactly_once in a kafka producer in > flink? > As of right now I don't think the code shown in the docs > ( > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer > ) > actually works. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > -- Best Regards, *Qingsheng Ren* Electrical and Computer Engineering Carnegie Mellon University Email: renqs...@gmail.com
Re: Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer
Has anyone had success with using exactly_once in a kafka producer in flink? As of right now I don't think the code shown in the docs (https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producer) actually works. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Flink Kafka EXACTLY_ONCE causing KafkaException ByteArraySerializer is not an instance of Serializer
I'm using Flink 1.10 and Kafka (AWS MSK) 2.2 and trying to do a simple app that consumes from one kafka topic and produces events into another topic. I would like to utilize the exactly_once semantic, however, I am experiencing the following error: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593) at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1099) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1036) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:76) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290) at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka. common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:359) ... 12 more My producer is defined as new FlinkKafkaProducer[String]( appArgs.authTxnTopic, // target topic new KeyedSerializationSchemaWrapper[String]( new SimpleStringSchema() ), // serialization schema kafkaProdProps, // producer config, FlinkKafkaProducer.Semantic.EXACTLY_ONCE ) if I remove the exactly_once semantic as below, it works. new FlinkKafkaProducer[String]( appArgs.authTxnTopic, // target topic new KeyedSerializationSchemaWrapper[String]( new SimpleStringSchema()