[jira] [Commented] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"

2021-04-29 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17336114#comment-17336114
 ] 

Flink Jira Bot commented on FLINK-18940:


This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Cannot cast "org.apache.flink.table.data.RowData" to 
> "org.apache.flink.table.data.binary.BinaryStringData"
> --
>
> Key: FLINK-18940
> URL: https://issues.apache.org/jira/browse/FLINK-18940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
> Environment: macOS Catalina
> java8
> scala 2.11
>Reporter: jack sun
>Priority: Major
>  Labels: stale-major
> Attachments: 1.png, 2.png
>
>
> sourcecode is simple like this , there is nothing wrong with kafkaStream, if 
> I replace $"zone_id" to another Interge type of column, It's find,
> but when a String column , boom ..
> val kafkaStream = env.addSource(
> new FlinkKafkaConsumer011[BaseLog]("base_log", new 
> BaseLogDeserializationSchema(), properties)
> )
> tenv.createTemporaryView("baselog",kafkaStream)
> tenv.from("baselog")
> .select($"zone_id")
> .toAppendStream[String]
> .print()
> the schema is a case class , like this
> case class BaseLog(
> var timestamp: Long, 
> log_id: Int,
> fork_id: String ,
> zone_id: String,
> plat_id: String,
> ..
> )
> sql explain:
> == Abstract Syntax Tree ==
> LogicalProject(zone_id=[$4])
> +- LogicalTableScan(table=[[default_catalog, default_database, baselog]])
> == Optimized Logical Plan ==
> Calc(select=[zone_id])
> +- DataStreamScan(table=[[default_catalog, default_database, baselog]], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
> == Physical Execution Plan ==
> Stage 1 : Data Source
>   content : Source: Custom Source
>   Stage 2 : Operator
>   content : 
> SourceConversion(table=[default_catalog.default_database.baselog], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
>   ship_strategy : FORWARD
>   Stage 3 : Operator
>   content : Calc(select=[zone_id])
>   ship_strategy : FORWARD
> the exceptions bellow : 
> /* 1 */
> /* 2 */  public class SinkConversion$4 extends 
> org.apache.flink.table.runtime.operators.TableStreamOperator
> /* 3 */  implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */private final Object[] references;
> /* 6 */private transient 
> org.apache.flink.table.data.util.DataFormatConverters.StringConverter 
> converter$3;
> /* 7 */private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 8 */
> /* 9 */public SinkConversion$4(
> /* 10 */Object[] references,
> /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 13 */org.apache.flink.streaming.api.operators.Output output,
> /* 14 */
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {
> /* 15 */  this.references = references;
> /* 16 */  converter$3 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) 
> references[0]));
> /* 17 */  this.setup(task, config, output);
> /* 18 */  if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> /* 19 */
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> /* 20 */  .setProcessingTimeService(processingTimeService);
> /* 21 */  }
> /* 22 */}
> /* 23 */
> /* 24 */@Override
> /* 25 */public void open() throws Exception {
> /* 26 */  super.open();
> /* 27 */  
> /* 28 */}
> /* 29 */
> /* 30 */@Override
> /* 31 */public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {
> /* 32 */  org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) element.getValue();
> /* 33 */  
> /* 34 */  
> /* 35 */  
> /* 36 */  
> /* 37 */  
> /* 38 */  output.collect(outElement.replace((java.lang.String) 
> 

[jira] [Commented] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"

2021-04-22 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17327691#comment-17327691
 ] 

Flink Jira Bot commented on FLINK-18940:


This major issue is unassigned and itself and all of its Sub-Tasks have not 
been updated for 30 days. So, it has been labeled "stale-major". If this ticket 
is indeed "major", please either assign yourself or give an update. Afterwards, 
please remove the label. In 7 days the issue will be deprioritized.

> Cannot cast "org.apache.flink.table.data.RowData" to 
> "org.apache.flink.table.data.binary.BinaryStringData"
> --
>
> Key: FLINK-18940
> URL: https://issues.apache.org/jira/browse/FLINK-18940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
> Environment: macOS Catalina
> java8
> scala 2.11
>Reporter: jack sun
>Priority: Major
>  Labels: stale-major
> Attachments: 1.png, 2.png
>
>
> sourcecode is simple like this , there is nothing wrong with kafkaStream, if 
> I replace $"zone_id" to another Interge type of column, It's find,
> but when a String column , boom ..
> val kafkaStream = env.addSource(
> new FlinkKafkaConsumer011[BaseLog]("base_log", new 
> BaseLogDeserializationSchema(), properties)
> )
> tenv.createTemporaryView("baselog",kafkaStream)
> tenv.from("baselog")
> .select($"zone_id")
> .toAppendStream[String]
> .print()
> the schema is a case class , like this
> case class BaseLog(
> var timestamp: Long, 
> log_id: Int,
> fork_id: String ,
> zone_id: String,
> plat_id: String,
> ..
> )
> sql explain:
> == Abstract Syntax Tree ==
> LogicalProject(zone_id=[$4])
> +- LogicalTableScan(table=[[default_catalog, default_database, baselog]])
> == Optimized Logical Plan ==
> Calc(select=[zone_id])
> +- DataStreamScan(table=[[default_catalog, default_database, baselog]], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
> == Physical Execution Plan ==
> Stage 1 : Data Source
>   content : Source: Custom Source
>   Stage 2 : Operator
>   content : 
> SourceConversion(table=[default_catalog.default_database.baselog], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
>   ship_strategy : FORWARD
>   Stage 3 : Operator
>   content : Calc(select=[zone_id])
>   ship_strategy : FORWARD
> the exceptions bellow : 
> /* 1 */
> /* 2 */  public class SinkConversion$4 extends 
> org.apache.flink.table.runtime.operators.TableStreamOperator
> /* 3 */  implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */private final Object[] references;
> /* 6 */private transient 
> org.apache.flink.table.data.util.DataFormatConverters.StringConverter 
> converter$3;
> /* 7 */private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 8 */
> /* 9 */public SinkConversion$4(
> /* 10 */Object[] references,
> /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 13 */org.apache.flink.streaming.api.operators.Output output,
> /* 14 */
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {
> /* 15 */  this.references = references;
> /* 16 */  converter$3 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) 
> references[0]));
> /* 17 */  this.setup(task, config, output);
> /* 18 */  if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> /* 19 */
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> /* 20 */  .setProcessingTimeService(processingTimeService);
> /* 21 */  }
> /* 22 */}
> /* 23 */
> /* 24 */@Override
> /* 25 */public void open() throws Exception {
> /* 26 */  super.open();
> /* 27 */  
> /* 28 */}
> /* 29 */
> /* 30 */@Override
> /* 31 */public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {
> /* 32 */  org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) element.getValue();
> /* 33 */  
> /* 34 */  
> /* 35 */  
> /* 36 */  
> /* 37 */  
> /* 38 */  

[jira] [Commented] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"

2020-08-14 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1715#comment-1715
 ] 

Timo Walther commented on FLINK-18940:
--

Thanks, this is definitely a bug. I'm currently working on a FLIP that reworks 
the DataStream <-> Table interoperability that would eventually also fix this. 

> Cannot cast "org.apache.flink.table.data.RowData" to 
> "org.apache.flink.table.data.binary.BinaryStringData"
> --
>
> Key: FLINK-18940
> URL: https://issues.apache.org/jira/browse/FLINK-18940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
> Environment: macOS Catalina
> java8
> scala 2.11
>Reporter: jack sun
>Priority: Major
> Attachments: 1.png, 2.png
>
>
> sourcecode is simple like this , there is nothing wrong with kafkaStream, if 
> I replace $"zone_id" to another Interge type of column, It's find,
> but when a String column , boom ..
> val kafkaStream = env.addSource(
> new FlinkKafkaConsumer011[BaseLog]("base_log", new 
> BaseLogDeserializationSchema(), properties)
> )
> tenv.createTemporaryView("baselog",kafkaStream)
> tenv.from("baselog")
> .select($"zone_id")
> .toAppendStream[String]
> .print()
> the schema is a case class , like this
> case class BaseLog(
> var timestamp: Long, 
> log_id: Int,
> fork_id: String ,
> zone_id: String,
> plat_id: String,
> ..
> )
> sql explain:
> == Abstract Syntax Tree ==
> LogicalProject(zone_id=[$4])
> +- LogicalTableScan(table=[[default_catalog, default_database, baselog]])
> == Optimized Logical Plan ==
> Calc(select=[zone_id])
> +- DataStreamScan(table=[[default_catalog, default_database, baselog]], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
> == Physical Execution Plan ==
> Stage 1 : Data Source
>   content : Source: Custom Source
>   Stage 2 : Operator
>   content : 
> SourceConversion(table=[default_catalog.default_database.baselog], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
>   ship_strategy : FORWARD
>   Stage 3 : Operator
>   content : Calc(select=[zone_id])
>   ship_strategy : FORWARD
> the exceptions bellow : 
> /* 1 */
> /* 2 */  public class SinkConversion$4 extends 
> org.apache.flink.table.runtime.operators.TableStreamOperator
> /* 3 */  implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */private final Object[] references;
> /* 6 */private transient 
> org.apache.flink.table.data.util.DataFormatConverters.StringConverter 
> converter$3;
> /* 7 */private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 8 */
> /* 9 */public SinkConversion$4(
> /* 10 */Object[] references,
> /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 13 */org.apache.flink.streaming.api.operators.Output output,
> /* 14 */
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {
> /* 15 */  this.references = references;
> /* 16 */  converter$3 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) 
> references[0]));
> /* 17 */  this.setup(task, config, output);
> /* 18 */  if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> /* 19 */
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> /* 20 */  .setProcessingTimeService(processingTimeService);
> /* 21 */  }
> /* 22 */}
> /* 23 */
> /* 24 */@Override
> /* 25 */public void open() throws Exception {
> /* 26 */  super.open();
> /* 27 */  
> /* 28 */}
> /* 29 */
> /* 30 */@Override
> /* 31 */public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {
> /* 32 */  org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) element.getValue();
> /* 33 */  
> /* 34 */  
> /* 35 */  
> /* 36 */  
> /* 37 */  
> /* 38 */  output.collect(outElement.replace((java.lang.String) 
> converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) 
> in1)));
> /* 39 */  
> /* 40 */}
> /* 41 */

[jira] [Commented] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"

2020-08-13 Thread jack sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177435#comment-17177435
 ] 

jack sun commented on FLINK-18940:
--

[~twalthr] yep , I append it to the description

> Cannot cast "org.apache.flink.table.data.RowData" to 
> "org.apache.flink.table.data.binary.BinaryStringData"
> --
>
> Key: FLINK-18940
> URL: https://issues.apache.org/jira/browse/FLINK-18940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
> Environment: macOS Catalina
> java8
> scala 2.11
>Reporter: jack sun
>Priority: Major
>
> sourcecode is simple like this , there is nothing wrong with kafkaStream, if 
> I replace $"zone_id" to another Interge type of column, It's find,
> but when a String column , boom ..
> val kafkaStream = env.addSource(
> new FlinkKafkaConsumer011[BaseLog]("base_log", new 
> BaseLogDeserializationSchema(), properties)
> )
> tenv.createTemporaryView("baselog",kafkaStream)
> tenv.from("baselog")
> .select($"zone_id")
> .toAppendStream[String]
> .print()
> the schema is a case class , like this
> case class BaseLog(
> var timestamp: Long, 
> log_id: Int,
> fork_id: String ,
> zone_id: String,
> plat_id: String,
> ..
> )
> sql explain:
> == Abstract Syntax Tree ==
> LogicalProject(zone_id=[$4])
> +- LogicalTableScan(table=[[default_catalog, default_database, baselog]])
> == Optimized Logical Plan ==
> Calc(select=[zone_id])
> +- DataStreamScan(table=[[default_catalog, default_database, baselog]], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
> == Physical Execution Plan ==
> Stage 1 : Data Source
>   content : Source: Custom Source
>   Stage 2 : Operator
>   content : 
> SourceConversion(table=[default_catalog.default_database.baselog], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
>   ship_strategy : FORWARD
>   Stage 3 : Operator
>   content : Calc(select=[zone_id])
>   ship_strategy : FORWARD
> the exceptions bellow : 
> /* 1 */
> /* 2 */  public class SinkConversion$4 extends 
> org.apache.flink.table.runtime.operators.TableStreamOperator
> /* 3 */  implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */private final Object[] references;
> /* 6 */private transient 
> org.apache.flink.table.data.util.DataFormatConverters.StringConverter 
> converter$3;
> /* 7 */private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 8 */
> /* 9 */public SinkConversion$4(
> /* 10 */Object[] references,
> /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 13 */org.apache.flink.streaming.api.operators.Output output,
> /* 14 */
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {
> /* 15 */  this.references = references;
> /* 16 */  converter$3 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) 
> references[0]));
> /* 17 */  this.setup(task, config, output);
> /* 18 */  if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> /* 19 */
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> /* 20 */  .setProcessingTimeService(processingTimeService);
> /* 21 */  }
> /* 22 */}
> /* 23 */
> /* 24 */@Override
> /* 25 */public void open() throws Exception {
> /* 26 */  super.open();
> /* 27 */  
> /* 28 */}
> /* 29 */
> /* 30 */@Override
> /* 31 */public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {
> /* 32 */  org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) element.getValue();
> /* 33 */  
> /* 34 */  
> /* 35 */  
> /* 36 */  
> /* 37 */  
> /* 38 */  output.collect(outElement.replace((java.lang.String) 
> converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) 
> in1)));
> /* 39 */  
> /* 40 */}
> /* 41 */
> /* 42 */
> /* 43 */
> /* 44 */@Override
> /* 45 */public void close() throws Exception {
> /* 46 */   super.close();
> /* 47 

[jira] [Commented] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"

2020-08-13 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177025#comment-17177025
 ] 

Timo Walther commented on FLINK-18940:
--

[~any] Could you explain the query/pipeline and schema that lead to this 
exception and generated code?

> Cannot cast "org.apache.flink.table.data.RowData" to 
> "org.apache.flink.table.data.binary.BinaryStringData"
> --
>
> Key: FLINK-18940
> URL: https://issues.apache.org/jira/browse/FLINK-18940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
> Environment: macOS Catalina
> java8
> scala 2.11
>Reporter: jack sun
>Priority: Major
>
> /* 1 */
> /* 2 */  public class SinkConversion$4 extends 
> org.apache.flink.table.runtime.operators.TableStreamOperator
> /* 3 */  implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */private final Object[] references;
> /* 6 */private transient 
> org.apache.flink.table.data.util.DataFormatConverters.StringConverter 
> converter$3;
> /* 7 */private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 8 */
> /* 9 */public SinkConversion$4(
> /* 10 */Object[] references,
> /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 13 */org.apache.flink.streaming.api.operators.Output output,
> /* 14 */
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {
> /* 15 */  this.references = references;
> /* 16 */  converter$3 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) 
> references[0]));
> /* 17 */  this.setup(task, config, output);
> /* 18 */  if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> /* 19 */
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> /* 20 */  .setProcessingTimeService(processingTimeService);
> /* 21 */  }
> /* 22 */}
> /* 23 */
> /* 24 */@Override
> /* 25 */public void open() throws Exception {
> /* 26 */  super.open();
> /* 27 */  
> /* 28 */}
> /* 29 */
> /* 30 */@Override
> /* 31 */public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {
> /* 32 */  org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) element.getValue();
> /* 33 */  
> /* 34 */  
> /* 35 */  
> /* 36 */  
> /* 37 */  
> /* 38 */  output.collect(outElement.replace((java.lang.String) 
> converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) 
> in1)));
> /* 39 */  
> /* 40 */}
> /* 41 */
> /* 42 */
> /* 43 */
> /* 44 */@Override
> /* 45 */public void close() throws Exception {
> /* 46 */   super.close();
> /* 47 */  
> /* 48 */}
> /* 49 */
> /* 50 */
> /* 51 */  }
> /* 52 */
> 19:53:58,610 WARN  org.apache.flink.runtime.taskmanager.Task  
>- Source: Custom Source -> 
> SourceConversion(table=[default_catalog.default_database.baselog], 
> fields=[timestamp, zone_id,..]) -> Calc(select=[zone_id]) -> 
> SinkConversionToString -> Sink: Print to Std. Out (2/16) 
> (8e0da69492bc45fffdbc17cb32c8f99b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Could not instantiate generated class 
> 'SinkConversion$4'
>   at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
>   at 
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:470)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
>   at 
>