Hi,

Changelog mode is the concept of the table API. You can get a changelog
stream from StreamTableEnvironment#fromChangelogStream.
For kafka connector, its changelog mode depends on the used format.

Best,
Hang

liu ron <ron9....@gmail.com> 于2023年8月13日周日 22:06写道:

> Hi,
>
> After deep dive into the source code, I guess you use the
> StreamTableEnvironment#fromDataStream method, this method only supports the
> insert-only message. According to your case, I think you should use the
> StreamTableEnvironment#fromChangelogStream[1], it supports consuming update
> row.
>
> [1]
> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java#L317
>
> Best,
> Ron
>
> 完结篇 <2366123...@qq.com> 于2023年8月12日周六 02:29写道:
>
>> Flink:1.15.2
>>
>> I am now going to change the data stream from *DataStream<String>* to
>> *DataStream<ROW>*
>>
>> Already implemented (*insert only works fine*), but when
>> DataStream<String> contains *update *information
>>
>> The error is:
>> *Caused by: org.apache.flink.util.FlinkRuntimeException: Error during
>> input conversion. Conversion expects insert-only records but DataStream API
>> record contains: UPDATE_BEFORE*
>> at
>> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:121)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> *at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:180)*
>> at com.test.KafkaFlink$2.flatMap(KafkaFlink.java:160)
>> at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>> at
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:68)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62)
>> at
>> org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
>> at com.test.KafkaFlink$1.processElement(KafkaFlink.java:118)
>> at com.test.KafkaFlink$1.processElement(KafkaFlink.java:107)
>> at
>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>> at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> *kafkaflink.java:179-180 lines of code*
>>
>> Row before = ChangelogToRowUtil.extractRow(RowKind.UPDATE_BEFORE,
>> beforeObject, rowTypeInfo);
>> collector. collect(before);
>>
>> The before data output is -U[1, test, 123-456-789]
>>
>> I would like to know : How to convert the stream containing *update* data
>> from *DataStream<String>* to *DataStream<ROW>*
>>
>

Reply via email to