[jira] [Updated] (FLINK-20181) RowData cannot cast to Tuple2

2020-11-16 Thread Xianxun Ye (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianxun Ye updated FLINK-20181:
---
Description: 
I want to emit CDC data by my own StreamOperator.

flink version :1.11.2, blink planner.
{code:java}
//代码占位符
getTableEnv().registerTableSource(
"source",
new StreamTableSource() {
  TableSchema tableSchema = TableSchema.builder()
  .field("id", new AtomicDataType(new IntType(false)))
  .field("name", DataTypes.STRING())
  .field("type", DataTypes.STRING())
  .primaryKey("id")
  .build();  @Override
  public DataStream getDataStream(StreamExecutionEnvironment 
execEnv) {
return execEnv.addSource(new 
DebugSourceFunction(tableSchema.toRowDataType()));
  }  @Override
  public TableSchema getTableSchema() {
return tableSchema;
  }  @Override
  public DataType getProducedDataType() {
return getTableSchema().toRowDataType().bridgedTo(RowData.class);
  }
}
);
sql("insert into Test.testdb.animal "
+ " SELECT id, name, type, '2020' as da, '11' as hr"
+ " from source"
);  

class DebugSourceFunction extends RichParallelSourceFunction 
implements ResultTypeQueryable {   
DataType dataType;
public DebugSourceFunction(DataType dataType) {
  this.dataType = dataType;
}
@Override
public TypeInformation getProducedType() {
  return (TypeInformation) createTypeInformation(dataType);
}
@Override
public void run(SourceContext ctx) throws Exception {
  ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1, 
StringData.fromString("monkey"), StringData.fromString("small")));
}
@Override
public void cancel() {}public TypeInformation 
createTypeInformation(DataType producedDataType) {
  final DataType internalDataType = DataTypeUtils.transform(
  producedDataType,
  TypeTransformations.TO_INTERNAL_CLASS);
  return fromDataTypeToTypeInfo(internalDataType);
}
  }  

public class TestUpsertTableSink implements UpsertStreamTableSink, 
OverwritableTableSink, PartitionableTableSink {
 @Override
public DataStreamSink consumeDataStream(DataStream> dataStream) {
  
  DataStream returnStream = dataStream
  .map(
  (MapFunction, RowData>)
  value -> value.f1
  )
  ..  
  return returnStream
  .addSink(new DiscardingSink<>())
  .setParallelism(1);
}
  }
{code}
when I execute sql with `insert into ...`, occurs class cast fail exception:
{code:java}
//代码占位符
Caused by: java.lang.ClassCastException: 
org.apache.flink.table.data.GenericRowData cannot be cast to 
org.apache.flink.api.java.tuple.Tuple2Caused by: java.lang.ClassCastException: 
org.apache.flink.table.data.GenericRowData cannot be cast to 
org.apache.flink.api.java.tuple.Tuple2 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at StreamExecCalc$8.processElement(Unknown Source) at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
{code}

  was:
I want to emit CDC data by my own StreamOperator.

flink version :1.11.2, blink planner.
{code:java}

//代码占位符
  getTableEnv().registerTableSource(
"source",
new StreamTableSource() {
  TableSchema tableSchema = TableSchema.builder()
  .field("id", new AtomicDataType(new IntType(false)))
  .field("name", DataTypes.STRING())
  .field("type", DataTypes.STRING())
  .primaryKey("id")
  .build();  @Override
  public DataStream 

[jira] [Updated] (FLINK-20181) RowData cannot cast to Tuple2

2020-11-16 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-20181:

Component/s: Table SQL / Planner

> RowData cannot cast to Tuple2
> -
>
> Key: FLINK-20181
> URL: https://issues.apache.org/jira/browse/FLINK-20181
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Xianxun Ye
>Priority: Major
>
> I want to emit CDC data by my own StreamOperator.
> flink version :1.11.2, blink planner.
> {code:java}
> //代码占位符
>   getTableEnv().registerTableSource(
> "source",
> new StreamTableSource() {
>   TableSchema tableSchema = TableSchema.builder()
>   .field("id", new AtomicDataType(new IntType(false)))
>   .field("name", DataTypes.STRING())
>   .field("type", DataTypes.STRING())
>   .primaryKey("id")
>   .build();  @Override
>   public DataStream getDataStream(StreamExecutionEnvironment 
> execEnv) {
> return execEnv.addSource(new 
> DebugSourceFunction(tableSchema.toRowDataType()));
>   }  @Override
>   public TableSchema getTableSchema() {
> return tableSchema;
>   }  @Override
>   public DataType getProducedDataType() {
> return getTableSchema().toRowDataType().bridgedTo(RowData.class);
>   }
> }
> );
> sql("insert into Test.testdb.animal "
> + " SELECT id, name, type, '2020' as da, '11' as hr"
> + " from source"
> );  
> class DebugSourceFunction extends RichParallelSourceFunction 
> implements ResultTypeQueryable {DataType dataType;public 
> DebugSourceFunction(DataType dataType) {
>   this.dataType = dataType;
> }@Override
> public TypeInformation getProducedType() {
>   return (TypeInformation) createTypeInformation(dataType);
> }@Override
> public void run(SourceContext ctx) throws Exception {
>   ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1, 
> StringData.fromString("monkey"), StringData.fromString("small")));
> }@Override
> public void cancel() {}public TypeInformation 
> createTypeInformation(DataType producedDataType) {
>   final DataType internalDataType = DataTypeUtils.transform(
>   producedDataType,
>   TypeTransformations.TO_INTERNAL_CLASS);
>   return fromDataTypeToTypeInfo(internalDataType);
> }
>   }  
> public class TestUpsertTableSink implements UpsertStreamTableSink, 
> OverwritableTableSink, PartitionableTableSink {
>  @Override
> public DataStreamSink consumeDataStream(DataStream RowData>> dataStream) {
>   
>   DataStream returnStream = dataStream
>   .map(
>   (MapFunction, RowData>)
>   value -> value.f1
>   )
>   ..  return returnStream
>   .addSink(new DiscardingSink<>())
>   .setParallelism(1);
> }
>   }
> {code}
> when I execute sql with `insert into ...`, occurs class cast fail exception:
> {code:java}
> //代码占位符
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.table.data.GenericRowData cannot be cast to 
> org.apache.flink.api.java.tuple.Tuple2Caused by: 
> java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData 
> cannot be cast to org.apache.flink.api.java.tuple.Tuple2 at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>  at StreamExecCalc$8.processElement(Unknown Source) at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>  at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>  at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> {code}



--
This message was sent by