[jira] [Commented] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"
[ https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17336114#comment-17336114 ] Flink Jira Bot commented on FLINK-18940: This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Cannot cast "org.apache.flink.table.data.RowData" to > "org.apache.flink.table.data.binary.BinaryStringData" > -- > > Key: FLINK-18940 > URL: https://issues.apache.org/jira/browse/FLINK-18940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.1 > Environment: macOS Catalina > java8 > scala 2.11 >Reporter: jack sun >Priority: Major > Labels: stale-major > Attachments: 1.png, 2.png > > > sourcecode is simple like this , there is nothing wrong with kafkaStream, if > I replace $"zone_id" to another Interge type of column, It's find, > but when a String column , boom .. > val kafkaStream = env.addSource( > new FlinkKafkaConsumer011[BaseLog]("base_log", new > BaseLogDeserializationSchema(), properties) > ) > tenv.createTemporaryView("baselog",kafkaStream) > tenv.from("baselog") > .select($"zone_id") > .toAppendStream[String] > .print() > the schema is a case class , like this > case class BaseLog( > var timestamp: Long, > log_id: Int, > fork_id: String , > zone_id: String, > plat_id: String, > .. > ) > sql explain: > == Abstract Syntax Tree == > LogicalProject(zone_id=[$4]) > +- LogicalTableScan(table=[[default_catalog, default_database, baselog]]) > == Optimized Logical Plan == > Calc(select=[zone_id]) > +- DataStreamScan(table=[[default_catalog, default_database, baselog]], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: Custom Source > Stage 2 : Operator > content : > SourceConversion(table=[default_catalog.default_database.baselog], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > ship_strategy : FORWARD > Stage 3 : Operator > content : Calc(select=[zone_id]) > ship_strategy : FORWARD > the exceptions bellow : > /* 1 */ > /* 2 */ public class SinkConversion$4 extends > org.apache.flink.table.runtime.operators.TableStreamOperator > /* 3 */ implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > /* 4 */ > /* 5 */private final Object[] references; > /* 6 */private transient > org.apache.flink.table.data.util.DataFormatConverters.StringConverter > converter$3; > /* 7 */private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > /* 8 */ > /* 9 */public SinkConversion$4( > /* 10 */Object[] references, > /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task, > /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config, > /* 13 */org.apache.flink.streaming.api.operators.Output output, > /* 14 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception { > /* 15 */ this.references = references; > /* 16 */ converter$3 = > (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) > references[0])); > /* 17 */ this.setup(task, config, output); > /* 18 */ if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) { > /* 19 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) > /* 20 */ .setProcessingTimeService(processingTimeService); > /* 21 */ } > /* 22 */} > /* 23 */ > /* 24 */@Override > /* 25 */public void open() throws Exception { > /* 26 */ super.open(); > /* 27 */ > /* 28 */} > /* 29 */ > /* 30 */@Override > /* 31 */public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > /* 32 */ org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) element.getValue(); > /* 33 */ > /* 34 */ > /* 35 */ > /* 36 */ > /* 37 */ > /* 38 */ output.collect(outElement.replace((java.lang.String) >
[jira] [Commented] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"
[ https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17327691#comment-17327691 ] Flink Jira Bot commented on FLINK-18940: This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > Cannot cast "org.apache.flink.table.data.RowData" to > "org.apache.flink.table.data.binary.BinaryStringData" > -- > > Key: FLINK-18940 > URL: https://issues.apache.org/jira/browse/FLINK-18940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.1 > Environment: macOS Catalina > java8 > scala 2.11 >Reporter: jack sun >Priority: Major > Labels: stale-major > Attachments: 1.png, 2.png > > > sourcecode is simple like this , there is nothing wrong with kafkaStream, if > I replace $"zone_id" to another Interge type of column, It's find, > but when a String column , boom .. > val kafkaStream = env.addSource( > new FlinkKafkaConsumer011[BaseLog]("base_log", new > BaseLogDeserializationSchema(), properties) > ) > tenv.createTemporaryView("baselog",kafkaStream) > tenv.from("baselog") > .select($"zone_id") > .toAppendStream[String] > .print() > the schema is a case class , like this > case class BaseLog( > var timestamp: Long, > log_id: Int, > fork_id: String , > zone_id: String, > plat_id: String, > .. > ) > sql explain: > == Abstract Syntax Tree == > LogicalProject(zone_id=[$4]) > +- LogicalTableScan(table=[[default_catalog, default_database, baselog]]) > == Optimized Logical Plan == > Calc(select=[zone_id]) > +- DataStreamScan(table=[[default_catalog, default_database, baselog]], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: Custom Source > Stage 2 : Operator > content : > SourceConversion(table=[default_catalog.default_database.baselog], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > ship_strategy : FORWARD > Stage 3 : Operator > content : Calc(select=[zone_id]) > ship_strategy : FORWARD > the exceptions bellow : > /* 1 */ > /* 2 */ public class SinkConversion$4 extends > org.apache.flink.table.runtime.operators.TableStreamOperator > /* 3 */ implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > /* 4 */ > /* 5 */private final Object[] references; > /* 6 */private transient > org.apache.flink.table.data.util.DataFormatConverters.StringConverter > converter$3; > /* 7 */private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > /* 8 */ > /* 9 */public SinkConversion$4( > /* 10 */Object[] references, > /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task, > /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config, > /* 13 */org.apache.flink.streaming.api.operators.Output output, > /* 14 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception { > /* 15 */ this.references = references; > /* 16 */ converter$3 = > (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) > references[0])); > /* 17 */ this.setup(task, config, output); > /* 18 */ if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) { > /* 19 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) > /* 20 */ .setProcessingTimeService(processingTimeService); > /* 21 */ } > /* 22 */} > /* 23 */ > /* 24 */@Override > /* 25 */public void open() throws Exception { > /* 26 */ super.open(); > /* 27 */ > /* 28 */} > /* 29 */ > /* 30 */@Override > /* 31 */public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > /* 32 */ org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) element.getValue(); > /* 33 */ > /* 34 */ > /* 35 */ > /* 36 */ > /* 37 */ > /* 38 */
[jira] [Commented] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"
[ https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1715#comment-1715 ] Timo Walther commented on FLINK-18940: -- Thanks, this is definitely a bug. I'm currently working on a FLIP that reworks the DataStream <-> Table interoperability that would eventually also fix this. > Cannot cast "org.apache.flink.table.data.RowData" to > "org.apache.flink.table.data.binary.BinaryStringData" > -- > > Key: FLINK-18940 > URL: https://issues.apache.org/jira/browse/FLINK-18940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.1 > Environment: macOS Catalina > java8 > scala 2.11 >Reporter: jack sun >Priority: Major > Attachments: 1.png, 2.png > > > sourcecode is simple like this , there is nothing wrong with kafkaStream, if > I replace $"zone_id" to another Interge type of column, It's find, > but when a String column , boom .. > val kafkaStream = env.addSource( > new FlinkKafkaConsumer011[BaseLog]("base_log", new > BaseLogDeserializationSchema(), properties) > ) > tenv.createTemporaryView("baselog",kafkaStream) > tenv.from("baselog") > .select($"zone_id") > .toAppendStream[String] > .print() > the schema is a case class , like this > case class BaseLog( > var timestamp: Long, > log_id: Int, > fork_id: String , > zone_id: String, > plat_id: String, > .. > ) > sql explain: > == Abstract Syntax Tree == > LogicalProject(zone_id=[$4]) > +- LogicalTableScan(table=[[default_catalog, default_database, baselog]]) > == Optimized Logical Plan == > Calc(select=[zone_id]) > +- DataStreamScan(table=[[default_catalog, default_database, baselog]], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: Custom Source > Stage 2 : Operator > content : > SourceConversion(table=[default_catalog.default_database.baselog], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > ship_strategy : FORWARD > Stage 3 : Operator > content : Calc(select=[zone_id]) > ship_strategy : FORWARD > the exceptions bellow : > /* 1 */ > /* 2 */ public class SinkConversion$4 extends > org.apache.flink.table.runtime.operators.TableStreamOperator > /* 3 */ implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > /* 4 */ > /* 5 */private final Object[] references; > /* 6 */private transient > org.apache.flink.table.data.util.DataFormatConverters.StringConverter > converter$3; > /* 7 */private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > /* 8 */ > /* 9 */public SinkConversion$4( > /* 10 */Object[] references, > /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task, > /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config, > /* 13 */org.apache.flink.streaming.api.operators.Output output, > /* 14 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception { > /* 15 */ this.references = references; > /* 16 */ converter$3 = > (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) > references[0])); > /* 17 */ this.setup(task, config, output); > /* 18 */ if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) { > /* 19 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) > /* 20 */ .setProcessingTimeService(processingTimeService); > /* 21 */ } > /* 22 */} > /* 23 */ > /* 24 */@Override > /* 25 */public void open() throws Exception { > /* 26 */ super.open(); > /* 27 */ > /* 28 */} > /* 29 */ > /* 30 */@Override > /* 31 */public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > /* 32 */ org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) element.getValue(); > /* 33 */ > /* 34 */ > /* 35 */ > /* 36 */ > /* 37 */ > /* 38 */ output.collect(outElement.replace((java.lang.String) > converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) > in1))); > /* 39 */ > /* 40 */} > /* 41 */
[jira] [Commented] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"
[ https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177435#comment-17177435 ] jack sun commented on FLINK-18940: -- [~twalthr] yep , I append it to the description > Cannot cast "org.apache.flink.table.data.RowData" to > "org.apache.flink.table.data.binary.BinaryStringData" > -- > > Key: FLINK-18940 > URL: https://issues.apache.org/jira/browse/FLINK-18940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.1 > Environment: macOS Catalina > java8 > scala 2.11 >Reporter: jack sun >Priority: Major > > sourcecode is simple like this , there is nothing wrong with kafkaStream, if > I replace $"zone_id" to another Interge type of column, It's find, > but when a String column , boom .. > val kafkaStream = env.addSource( > new FlinkKafkaConsumer011[BaseLog]("base_log", new > BaseLogDeserializationSchema(), properties) > ) > tenv.createTemporaryView("baselog",kafkaStream) > tenv.from("baselog") > .select($"zone_id") > .toAppendStream[String] > .print() > the schema is a case class , like this > case class BaseLog( > var timestamp: Long, > log_id: Int, > fork_id: String , > zone_id: String, > plat_id: String, > .. > ) > sql explain: > == Abstract Syntax Tree == > LogicalProject(zone_id=[$4]) > +- LogicalTableScan(table=[[default_catalog, default_database, baselog]]) > == Optimized Logical Plan == > Calc(select=[zone_id]) > +- DataStreamScan(table=[[default_catalog, default_database, baselog]], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: Custom Source > Stage 2 : Operator > content : > SourceConversion(table=[default_catalog.default_database.baselog], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > ship_strategy : FORWARD > Stage 3 : Operator > content : Calc(select=[zone_id]) > ship_strategy : FORWARD > the exceptions bellow : > /* 1 */ > /* 2 */ public class SinkConversion$4 extends > org.apache.flink.table.runtime.operators.TableStreamOperator > /* 3 */ implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > /* 4 */ > /* 5 */private final Object[] references; > /* 6 */private transient > org.apache.flink.table.data.util.DataFormatConverters.StringConverter > converter$3; > /* 7 */private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > /* 8 */ > /* 9 */public SinkConversion$4( > /* 10 */Object[] references, > /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task, > /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config, > /* 13 */org.apache.flink.streaming.api.operators.Output output, > /* 14 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception { > /* 15 */ this.references = references; > /* 16 */ converter$3 = > (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) > references[0])); > /* 17 */ this.setup(task, config, output); > /* 18 */ if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) { > /* 19 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) > /* 20 */ .setProcessingTimeService(processingTimeService); > /* 21 */ } > /* 22 */} > /* 23 */ > /* 24 */@Override > /* 25 */public void open() throws Exception { > /* 26 */ super.open(); > /* 27 */ > /* 28 */} > /* 29 */ > /* 30 */@Override > /* 31 */public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > /* 32 */ org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) element.getValue(); > /* 33 */ > /* 34 */ > /* 35 */ > /* 36 */ > /* 37 */ > /* 38 */ output.collect(outElement.replace((java.lang.String) > converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) > in1))); > /* 39 */ > /* 40 */} > /* 41 */ > /* 42 */ > /* 43 */ > /* 44 */@Override > /* 45 */public void close() throws Exception { > /* 46 */ super.close(); > /* 47
[jira] [Commented] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"
[ https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177025#comment-17177025 ] Timo Walther commented on FLINK-18940: -- [~any] Could you explain the query/pipeline and schema that lead to this exception and generated code? > Cannot cast "org.apache.flink.table.data.RowData" to > "org.apache.flink.table.data.binary.BinaryStringData" > -- > > Key: FLINK-18940 > URL: https://issues.apache.org/jira/browse/FLINK-18940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.1 > Environment: macOS Catalina > java8 > scala 2.11 >Reporter: jack sun >Priority: Major > > /* 1 */ > /* 2 */ public class SinkConversion$4 extends > org.apache.flink.table.runtime.operators.TableStreamOperator > /* 3 */ implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > /* 4 */ > /* 5 */private final Object[] references; > /* 6 */private transient > org.apache.flink.table.data.util.DataFormatConverters.StringConverter > converter$3; > /* 7 */private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > /* 8 */ > /* 9 */public SinkConversion$4( > /* 10 */Object[] references, > /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task, > /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config, > /* 13 */org.apache.flink.streaming.api.operators.Output output, > /* 14 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception { > /* 15 */ this.references = references; > /* 16 */ converter$3 = > (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) > references[0])); > /* 17 */ this.setup(task, config, output); > /* 18 */ if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) { > /* 19 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) > /* 20 */ .setProcessingTimeService(processingTimeService); > /* 21 */ } > /* 22 */} > /* 23 */ > /* 24 */@Override > /* 25 */public void open() throws Exception { > /* 26 */ super.open(); > /* 27 */ > /* 28 */} > /* 29 */ > /* 30 */@Override > /* 31 */public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > /* 32 */ org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) element.getValue(); > /* 33 */ > /* 34 */ > /* 35 */ > /* 36 */ > /* 37 */ > /* 38 */ output.collect(outElement.replace((java.lang.String) > converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) > in1))); > /* 39 */ > /* 40 */} > /* 41 */ > /* 42 */ > /* 43 */ > /* 44 */@Override > /* 45 */public void close() throws Exception { > /* 46 */ super.close(); > /* 47 */ > /* 48 */} > /* 49 */ > /* 50 */ > /* 51 */ } > /* 52 */ > 19:53:58,610 WARN org.apache.flink.runtime.taskmanager.Task >- Source: Custom Source -> > SourceConversion(table=[default_catalog.default_database.baselog], > fields=[timestamp, zone_id,..]) -> Calc(select=[zone_id]) -> > SinkConversionToString -> Sink: Print to Std. Out (2/16) > (8e0da69492bc45fffdbc17cb32c8f99b) switched from RUNNING to FAILED. > java.lang.RuntimeException: Could not instantiate generated class > 'SinkConversion$4' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) > at > org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40) > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:470) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) > at >