Hi, Changelog mode is the concept of the table API. You can get a changelog stream from StreamTableEnvironment#fromChangelogStream. For kafka connector, its changelog mode depends on the used format.
Best, Hang liu ron <ron9....@gmail.com> 于2023年8月13日周日 22:06写道: > Hi, > > After deep dive into the source code, I guess you use the > StreamTableEnvironment#fromDataStream method, this method only supports the > insert-only message. According to your case, I think you should use the > StreamTableEnvironment#fromChangelogStream[1], it supports consuming update > row. > > [1] > https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java#L317 > > Best, > Ron > > 完结篇 <2366123...@qq.com> 于2023年8月12日周六 02:29写道: > >> Flink:1.15.2 >> >> I am now going to change the data stream from *DataStream<String>* to >> *DataStream<ROW>* >> >> Already implemented (*insert only works fine*), but when >> DataStream<String> contains *update *information >> >> The error is: >> *Caused by: org.apache.flink.util.FlinkRuntimeException: Error during >> input conversion. Conversion expects insert-only records but DataStream API >> record contains: UPDATE_BEFORE* >> at >> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:121) >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) >> at >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >> *at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:180)* >> at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:160) >> at >> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) >> at >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62) >> at >> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103) >> at com.test.KafkaFlink$1.processElement(KafkaFlink.java:118) >> at com.test.KafkaFlink$1.processElement(KafkaFlink.java:107) >> at >> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) >> at >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) >> at >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) >> at java.lang.Thread.run(Thread.java:748) >> >> *kafkaflink.java:179-180 lines of code* >> >> Row before = ChangelogToRowUtil.extractRow(RowKind.UPDATE_BEFORE, >> beforeObject, rowTypeInfo); >> collector. collect(before); >> >> The before data output is -U[1, test, 123-456-789] >> >> I would like to know : How to convert the stream containing *update* data >> from *DataStream<String>* to *DataStream<ROW>* >> >