Hi!

This seems to be caused by some mismatching types in your source definition
and your workflow. If possible could you describe the schema of your Kafka
source and paste your datastream / Table / SQL code here?

Dhiru <userdh...@yahoo.com> 于2021年9月14日周二 上午3:49写道:

> *I am not sure when we try to receive data from Apache Kafka I get this error 
> , but works good for me when I try to run via Conflunece kafka *
>
> java.lang.ClassCastException: class java.lang.String cannot be cast to class 
> scala.Product (java.lang.String is in module java.base of loader 'bootstrap'; 
> scala.Product is in unnamed module of loader 'app')
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:96)
>       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>       at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
>
>

Reply via email to