[jira] [Closed] (FLINK-20859) java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter
[ https://issues.apache.org/jira/browse/FLINK-20859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jack sun closed FLINK-20859. Resolution: Fixed 需要引入 org.apache.parquet parquet-avro 1.11.1 > java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter > - > > Key: FLINK-20859 > URL: https://issues.apache.org/jira/browse/FLINK-20859 > Project: Flink > Issue Type: Bug > Components: API / Core >Affects Versions: 1.12.0 >Reporter: jack sun >Priority: Major > Attachments: 1.png, 2.png > > > 按照flink 1.12 官方 StreamingFileSink 示例,发生运行错误 > java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter > at > org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84) > at > org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73) > at > org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57) > at > org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69) > at > org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83) > at > org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:35) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:226) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:207) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215) > Caused by: java.lang.ClassNotFoundException: > org.apache.parquet.avro.AvroParquetWriter > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > at java.lang.ClassLoader.loadClass(ClassLoader.java:418) > at
[jira] [Created] (FLINK-20859) java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter
jack sun created FLINK-20859: Summary: java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter Key: FLINK-20859 URL: https://issues.apache.org/jira/browse/FLINK-20859 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.12.0 Reporter: jack sun Attachments: 1.png, 2.png 按照flink 1.12 官方 StreamingFileSink 示例,发生运行错误 java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84) at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73) at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69) at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:35) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:226) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:207) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215) Caused by: java.lang.ClassNotFoundException: org.apache.parquet.avro.AvroParquetWriter at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19015) java.lang.RuntimeException: Could not instantiate generated class 'GroupAggsHandler$15'
[ https://issues.apache.org/jira/browse/FLINK-19015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17189097#comment-17189097 ] jack sun commented on FLINK-19015: -- [~gkgkgk]no , It will throw 'reassignment to val' error, if replace by [Int,class], works well > java.lang.RuntimeException: Could not instantiate generated class > 'GroupAggsHandler$15' > --- > > Key: FLINK-19015 > URL: https://issues.apache.org/jira/browse/FLINK-19015 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.1 >Reporter: jack sun >Priority: Major > > source code : > import org.apache.flink.api.common.eventtime.{Watermark, WatermarkGenerator, > WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy} > import org.apache.flink.streaming.api.TimeCharacteristic > import org.apache.flink.table.functions.AggregateFunction > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.table.api._ > import org.apache.flink.table.api.bridge.scala._ > import org.apache.flink.types.Row > object TestAggFunction { > def main(args: Array[String]) { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tenv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()) > env.setParallelism(1) > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > val socketStream = env.socketTextStream("127.0.0.1",9090) > .map(x=>{ > val c=x.split(" ").toList > Temp3(c(0),c(1).toInt,System.currentTimeMillis()) > }) > .assignTimestampsAndWatermarks(new WatermarkStrategy[Temp3] { > override def createWatermarkGenerator(context: > WatermarkGeneratorSupplier.Context): WatermarkGenerator[Temp3] = { > new WatermarkGenerator[Temp3] { > val delay:Long = 0L//Time.seconds(10).toMilliseconds > var maxTimestamp: Long = 0L > > override def onEvent(t: Temp3, l: Long, > watermarkOutput: WatermarkOutput): Unit = { > maxTimestamp = maxTimestamp.max(t.timestamp) > val wm = new Watermark(maxTimestamp - delay) > watermarkOutput.emitWatermark(wm) > } > > override def onPeriodicEmit(watermarkOutput: > WatermarkOutput): Unit = Nil > } > } > }) > val table = > tenv.fromDataStream(socketStream,'role,'value,'pt.proctime) > tenv.createTemporaryView("t1",table) > tenv.registerFunction("testMax",new MaxAgg) > tenv.sqlQuery("select role,testMax(`value`) from t1 group by > role").toRetractStream[Row].print() > //tenv.sqlQuery("select * from t1").toRetractStream[Row].print() > env.execute("test") > } > } > case class Temp3(role:String,value:Int,timestamp:Long) > class MaxAgg extends AggregateFunction[Int,Int]{ > override def getValue(acc: Int): Int = acc > override def createAccumulator(): Int = 0 > def accumulate(acc: Int, rowVal:Int): Unit = acc.max(rowVal) > } > exceptions: > 18:09:49,800 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils >- Log file environment variable 'log.file' is not set. > 18:09:49,800 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils >- JobManager log files are unavailable in the web dashboard. Log file > location not found in environment variable 'log.file' or configuration key > 'web.log.path'. > 18:09:50,204 WARN org.apache.flink.metrics.MetricGroup >- The operator name > SourceConversion(table=[default_catalog.default_database.t1], fields=[role, > value, pt]) exceeded the 80 characters length limit and was truncated. > /* 1 */ > /* 2 */public final class GroupAggsHandler$15 implements > org.apache.flink.table.runtime.generated.AggsHandleFunction { > /* 3 */ > /* 4 */ private transient com.youyantech.streamJobs.MaxAgg > function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e; > /* 5 */ org.apache.flink.table.data.GenericRowData acc$7 = new > org.apache.flink.table.data.GenericRowData(1); > /* 6 */ org.apache.flink.table.data.GenericRowData acc$8 = new > org.apache.flink.table.data.GenericRowData(1); > /* 7 */ private java.lang.Integer agg0_acc_internal; > /* 8 */ private java.lang.Integer agg0_acc_external; > /* 9 */ org.apache.flink.table.data.GenericRowData aggValue$14 = new > org.apache.flink.table.data.GenericRowData(1); > /* 10 */ > /* 11 */
[jira] [Commented] (FLINK-19015) java.lang.RuntimeException: Could not instantiate generated class 'GroupAggsHandler$15'
[ https://issues.apache.org/jira/browse/FLINK-19015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182930#comment-17182930 ] jack sun commented on FLINK-19015: -- [~jark]java sdk 1.8 ,scala 2.11 , macOS Catalina 10.15.6 > java.lang.RuntimeException: Could not instantiate generated class > 'GroupAggsHandler$15' > --- > > Key: FLINK-19015 > URL: https://issues.apache.org/jira/browse/FLINK-19015 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.1 >Reporter: jack sun >Priority: Major > > source code : > import org.apache.flink.api.common.eventtime.{Watermark, WatermarkGenerator, > WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy} > import org.apache.flink.streaming.api.TimeCharacteristic > import org.apache.flink.table.functions.AggregateFunction > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.table.api._ > import org.apache.flink.table.api.bridge.scala._ > import org.apache.flink.types.Row > object TestAggFunction { > def main(args: Array[String]) { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tenv = StreamTableEnvironment.create(env, > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()) > env.setParallelism(1) > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > val socketStream = env.socketTextStream("127.0.0.1",9090) > .map(x=>{ > val c=x.split(" ").toList > Temp3(c(0),c(1).toInt,System.currentTimeMillis()) > }) > .assignTimestampsAndWatermarks(new WatermarkStrategy[Temp3] { > override def createWatermarkGenerator(context: > WatermarkGeneratorSupplier.Context): WatermarkGenerator[Temp3] = { > new WatermarkGenerator[Temp3] { > val delay:Long = 0L//Time.seconds(10).toMilliseconds > var maxTimestamp: Long = 0L > > override def onEvent(t: Temp3, l: Long, > watermarkOutput: WatermarkOutput): Unit = { > maxTimestamp = maxTimestamp.max(t.timestamp) > val wm = new Watermark(maxTimestamp - delay) > watermarkOutput.emitWatermark(wm) > } > > override def onPeriodicEmit(watermarkOutput: > WatermarkOutput): Unit = Nil > } > } > }) > val table = > tenv.fromDataStream(socketStream,'role,'value,'pt.proctime) > tenv.createTemporaryView("t1",table) > tenv.registerFunction("testMax",new MaxAgg) > tenv.sqlQuery("select role,testMax(`value`) from t1 group by > role").toRetractStream[Row].print() > //tenv.sqlQuery("select * from t1").toRetractStream[Row].print() > env.execute("test") > } > } > case class Temp3(role:String,value:Int,timestamp:Long) > class MaxAgg extends AggregateFunction[Int,Int]{ > override def getValue(acc: Int): Int = acc > override def createAccumulator(): Int = 0 > def accumulate(acc: Int, rowVal:Int): Unit = acc.max(rowVal) > } > exceptions: > 18:09:49,800 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils >- Log file environment variable 'log.file' is not set. > 18:09:49,800 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils >- JobManager log files are unavailable in the web dashboard. Log file > location not found in environment variable 'log.file' or configuration key > 'web.log.path'. > 18:09:50,204 WARN org.apache.flink.metrics.MetricGroup >- The operator name > SourceConversion(table=[default_catalog.default_database.t1], fields=[role, > value, pt]) exceeded the 80 characters length limit and was truncated. > /* 1 */ > /* 2 */public final class GroupAggsHandler$15 implements > org.apache.flink.table.runtime.generated.AggsHandleFunction { > /* 3 */ > /* 4 */ private transient com.youyantech.streamJobs.MaxAgg > function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e; > /* 5 */ org.apache.flink.table.data.GenericRowData acc$7 = new > org.apache.flink.table.data.GenericRowData(1); > /* 6 */ org.apache.flink.table.data.GenericRowData acc$8 = new > org.apache.flink.table.data.GenericRowData(1); > /* 7 */ private java.lang.Integer agg0_acc_internal; > /* 8 */ private java.lang.Integer agg0_acc_external; > /* 9 */ org.apache.flink.table.data.GenericRowData aggValue$14 = new > org.apache.flink.table.data.GenericRowData(1); > /* 10 */ > /* 11 */ private >
[jira] [Updated] (FLINK-19015) java.lang.RuntimeException: Could not instantiate generated class 'GroupAggsHandler$15'
[ https://issues.apache.org/jira/browse/FLINK-19015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jack sun updated FLINK-19015: - Description: source code : import org.apache.flink.api.common.eventtime.{Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.types.Row object TestAggFunction { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val tenv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()) env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val socketStream = env.socketTextStream("127.0.0.1",9090) .map(x=>{ val c=x.split(" ").toList Temp3(c(0),c(1).toInt,System.currentTimeMillis()) }) .assignTimestampsAndWatermarks(new WatermarkStrategy[Temp3] { override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[Temp3] = { new WatermarkGenerator[Temp3] { val delay:Long = 0L//Time.seconds(10).toMilliseconds var maxTimestamp: Long = 0L override def onEvent(t: Temp3, l: Long, watermarkOutput: WatermarkOutput): Unit = { maxTimestamp = maxTimestamp.max(t.timestamp) val wm = new Watermark(maxTimestamp - delay) watermarkOutput.emitWatermark(wm) } override def onPeriodicEmit(watermarkOutput: WatermarkOutput): Unit = Nil } } }) val table = tenv.fromDataStream(socketStream,'role,'value,'pt.proctime) tenv.createTemporaryView("t1",table) tenv.registerFunction("testMax",new MaxAgg) tenv.sqlQuery("select role,testMax(`value`) from t1 group by role").toRetractStream[Row].print() //tenv.sqlQuery("select * from t1").toRetractStream[Row].print() env.execute("test") } } case class Temp3(role:String,value:Int,timestamp:Long) class MaxAgg extends AggregateFunction[Int,Int]{ override def getValue(acc: Int): Int = acc override def createAccumulator(): Int = 0 def accumulate(acc: Int, rowVal:Int): Unit = acc.max(rowVal) } exceptions: 18:09:49,800 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file environment variable 'log.file' is not set. 18:09:49,800 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'web.log.path'. 18:09:50,204 WARN org.apache.flink.metrics.MetricGroup - The operator name SourceConversion(table=[default_catalog.default_database.t1], fields=[role, value, pt]) exceeded the 80 characters length limit and was truncated. /* 1 */ /* 2 */public final class GroupAggsHandler$15 implements org.apache.flink.table.runtime.generated.AggsHandleFunction { /* 3 */ /* 4 */ private transient com.youyantech.streamJobs.MaxAgg function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e; /* 5 */ org.apache.flink.table.data.GenericRowData acc$7 = new org.apache.flink.table.data.GenericRowData(1); /* 6 */ org.apache.flink.table.data.GenericRowData acc$8 = new org.apache.flink.table.data.GenericRowData(1); /* 7 */ private java.lang.Integer agg0_acc_internal; /* 8 */ private java.lang.Integer agg0_acc_external; /* 9 */ org.apache.flink.table.data.GenericRowData aggValue$14 = new org.apache.flink.table.data.GenericRowData(1); /* 10 */ /* 11 */ private org.apache.flink.table.runtime.dataview.StateDataViewStore store; /* 12 */ /* 13 */ public GroupAggsHandler$15(java.lang.Object[] references) throws Exception { /* 14 */ function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e = (((com.youyantech.streamJobs.MaxAgg) references[0])); /* 15 */ } /* 16 */ /* 17 */ private org.apache.flink.api.common.functions.RuntimeContext getRuntimeContext() { /* 18 */return store.getRuntimeContext(); /* 19 */ } /* 20 */ /* 21 */ @Override /* 22 */ public void open(org.apache.flink.table.runtime.dataview.StateDataViewStore store) throws Exception { /* 23 */this.store = store; /* 24 */
[jira] [Created] (FLINK-19015) java.lang.RuntimeException: Could not instantiate generated class 'GroupAggsHandler$15'
jack sun created FLINK-19015: Summary: java.lang.RuntimeException: Could not instantiate generated class 'GroupAggsHandler$15' Key: FLINK-19015 URL: https://issues.apache.org/jira/browse/FLINK-19015 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.11.1 Reporter: jack sun source code : import org.apache.flink.api.common.eventtime.{Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.types.Row object TestAggFunction { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val tenv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()) env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val socketStream = env.socketTextStream("127.0.0.1",9090) .map(x=>{ val c=x.split(" ").toList Temp3(c(0),c(1).toInt,System.currentTimeMillis()) }) .assignTimestampsAndWatermarks(new WatermarkStrategy[Temp3] { override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[Temp3] = { new WatermarkGenerator[Temp3] { val delay:Long = 0L//Time.seconds(10).toMilliseconds var maxTimestamp: Long = 0L override def onEvent(t: Temp3, l: Long, watermarkOutput: WatermarkOutput): Unit = { maxTimestamp = maxTimestamp.max(t.timestamp) val wm = new Watermark(maxTimestamp - delay) watermarkOutput.emitWatermark(wm) } override def onPeriodicEmit(watermarkOutput: WatermarkOutput): Unit = Nil } } }) val table = tenv.fromDataStream(socketStream,'role,'value,'pt.proctime) tenv.createTemporaryView("t1",table) tenv.registerFunction("testMax",new MaxAgg) tenv.sqlQuery("select role,testMax(`value`) from t1 group by role").toRetractStream[Row].print() //tenv.sqlQuery("select * from t1").toRetractStream[Row].print() env.execute("test") } } case class Temp3(role:String,value:Int,timestamp:Long) class MaxAgg extends AggregateFunction[Int,Int]{ override def getValue(acc: Int): Int = acc override def createAccumulator(): Int = 0 def accumulate(acc: Int, rowVal:Int): Unit = acc.max(rowVal) } exceptions: /Library/Java/JavaVirtualMachines/jdk1.8.0_231.jdk/Contents/Home/bin/java -javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=57789:/Applications/IntelliJ IDEA CE.app/Contents/bin -Dfile.encoding=UTF-8 -classpath
[jira] [Updated] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"
[ https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jack sun updated FLINK-18940: - Attachment: 2.png > Cannot cast "org.apache.flink.table.data.RowData" to > "org.apache.flink.table.data.binary.BinaryStringData" > -- > > Key: FLINK-18940 > URL: https://issues.apache.org/jira/browse/FLINK-18940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.1 > Environment: macOS Catalina > java8 > scala 2.11 >Reporter: jack sun >Priority: Major > Attachments: 1.png, 2.png > > > sourcecode is simple like this , there is nothing wrong with kafkaStream, if > I replace $"zone_id" to another Interge type of column, It's find, > but when a String column , boom .. > val kafkaStream = env.addSource( > new FlinkKafkaConsumer011[BaseLog]("base_log", new > BaseLogDeserializationSchema(), properties) > ) > tenv.createTemporaryView("baselog",kafkaStream) > tenv.from("baselog") > .select($"zone_id") > .toAppendStream[String] > .print() > the schema is a case class , like this > case class BaseLog( > var timestamp: Long, > log_id: Int, > fork_id: String , > zone_id: String, > plat_id: String, > .. > ) > sql explain: > == Abstract Syntax Tree == > LogicalProject(zone_id=[$4]) > +- LogicalTableScan(table=[[default_catalog, default_database, baselog]]) > == Optimized Logical Plan == > Calc(select=[zone_id]) > +- DataStreamScan(table=[[default_catalog, default_database, baselog]], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: Custom Source > Stage 2 : Operator > content : > SourceConversion(table=[default_catalog.default_database.baselog], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > ship_strategy : FORWARD > Stage 3 : Operator > content : Calc(select=[zone_id]) > ship_strategy : FORWARD > the exceptions bellow : > /* 1 */ > /* 2 */ public class SinkConversion$4 extends > org.apache.flink.table.runtime.operators.TableStreamOperator > /* 3 */ implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > /* 4 */ > /* 5 */private final Object[] references; > /* 6 */private transient > org.apache.flink.table.data.util.DataFormatConverters.StringConverter > converter$3; > /* 7 */private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > /* 8 */ > /* 9 */public SinkConversion$4( > /* 10 */Object[] references, > /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task, > /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config, > /* 13 */org.apache.flink.streaming.api.operators.Output output, > /* 14 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception { > /* 15 */ this.references = references; > /* 16 */ converter$3 = > (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) > references[0])); > /* 17 */ this.setup(task, config, output); > /* 18 */ if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) { > /* 19 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) > /* 20 */ .setProcessingTimeService(processingTimeService); > /* 21 */ } > /* 22 */} > /* 23 */ > /* 24 */@Override > /* 25 */public void open() throws Exception { > /* 26 */ super.open(); > /* 27 */ > /* 28 */} > /* 29 */ > /* 30 */@Override > /* 31 */public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > /* 32 */ org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) element.getValue(); > /* 33 */ > /* 34 */ > /* 35 */ > /* 36 */ > /* 37 */ > /* 38 */ output.collect(outElement.replace((java.lang.String) > converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) > in1))); > /* 39 */ > /* 40 */} > /* 41 */ > /* 42 */ > /* 43 */ > /* 44 */@Override > /* 45 */public void close() throws Exception { > /* 46 */ super.close(); > /* 47 */ > /* 48 */
[jira] [Updated] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"
[ https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jack sun updated FLINK-18940: - Attachment: 1.png > Cannot cast "org.apache.flink.table.data.RowData" to > "org.apache.flink.table.data.binary.BinaryStringData" > -- > > Key: FLINK-18940 > URL: https://issues.apache.org/jira/browse/FLINK-18940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.1 > Environment: macOS Catalina > java8 > scala 2.11 >Reporter: jack sun >Priority: Major > Attachments: 1.png, 2.png > > > sourcecode is simple like this , there is nothing wrong with kafkaStream, if > I replace $"zone_id" to another Interge type of column, It's find, > but when a String column , boom .. > val kafkaStream = env.addSource( > new FlinkKafkaConsumer011[BaseLog]("base_log", new > BaseLogDeserializationSchema(), properties) > ) > tenv.createTemporaryView("baselog",kafkaStream) > tenv.from("baselog") > .select($"zone_id") > .toAppendStream[String] > .print() > the schema is a case class , like this > case class BaseLog( > var timestamp: Long, > log_id: Int, > fork_id: String , > zone_id: String, > plat_id: String, > .. > ) > sql explain: > == Abstract Syntax Tree == > LogicalProject(zone_id=[$4]) > +- LogicalTableScan(table=[[default_catalog, default_database, baselog]]) > == Optimized Logical Plan == > Calc(select=[zone_id]) > +- DataStreamScan(table=[[default_catalog, default_database, baselog]], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: Custom Source > Stage 2 : Operator > content : > SourceConversion(table=[default_catalog.default_database.baselog], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > ship_strategy : FORWARD > Stage 3 : Operator > content : Calc(select=[zone_id]) > ship_strategy : FORWARD > the exceptions bellow : > /* 1 */ > /* 2 */ public class SinkConversion$4 extends > org.apache.flink.table.runtime.operators.TableStreamOperator > /* 3 */ implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > /* 4 */ > /* 5 */private final Object[] references; > /* 6 */private transient > org.apache.flink.table.data.util.DataFormatConverters.StringConverter > converter$3; > /* 7 */private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > /* 8 */ > /* 9 */public SinkConversion$4( > /* 10 */Object[] references, > /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task, > /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config, > /* 13 */org.apache.flink.streaming.api.operators.Output output, > /* 14 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception { > /* 15 */ this.references = references; > /* 16 */ converter$3 = > (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) > references[0])); > /* 17 */ this.setup(task, config, output); > /* 18 */ if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) { > /* 19 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) > /* 20 */ .setProcessingTimeService(processingTimeService); > /* 21 */ } > /* 22 */} > /* 23 */ > /* 24 */@Override > /* 25 */public void open() throws Exception { > /* 26 */ super.open(); > /* 27 */ > /* 28 */} > /* 29 */ > /* 30 */@Override > /* 31 */public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > /* 32 */ org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) element.getValue(); > /* 33 */ > /* 34 */ > /* 35 */ > /* 36 */ > /* 37 */ > /* 38 */ output.collect(outElement.replace((java.lang.String) > converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) > in1))); > /* 39 */ > /* 40 */} > /* 41 */ > /* 42 */ > /* 43 */ > /* 44 */@Override > /* 45 */public void close() throws Exception { > /* 46 */ super.close(); > /* 47 */ > /* 48 */
[jira] [Commented] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"
[ https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177435#comment-17177435 ] jack sun commented on FLINK-18940: -- [~twalthr] yep , I append it to the description > Cannot cast "org.apache.flink.table.data.RowData" to > "org.apache.flink.table.data.binary.BinaryStringData" > -- > > Key: FLINK-18940 > URL: https://issues.apache.org/jira/browse/FLINK-18940 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.11.1 > Environment: macOS Catalina > java8 > scala 2.11 >Reporter: jack sun >Priority: Major > > sourcecode is simple like this , there is nothing wrong with kafkaStream, if > I replace $"zone_id" to another Interge type of column, It's find, > but when a String column , boom .. > val kafkaStream = env.addSource( > new FlinkKafkaConsumer011[BaseLog]("base_log", new > BaseLogDeserializationSchema(), properties) > ) > tenv.createTemporaryView("baselog",kafkaStream) > tenv.from("baselog") > .select($"zone_id") > .toAppendStream[String] > .print() > the schema is a case class , like this > case class BaseLog( > var timestamp: Long, > log_id: Int, > fork_id: String , > zone_id: String, > plat_id: String, > .. > ) > sql explain: > == Abstract Syntax Tree == > LogicalProject(zone_id=[$4]) > +- LogicalTableScan(table=[[default_catalog, default_database, baselog]]) > == Optimized Logical Plan == > Calc(select=[zone_id]) > +- DataStreamScan(table=[[default_catalog, default_database, baselog]], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: Custom Source > Stage 2 : Operator > content : > SourceConversion(table=[default_catalog.default_database.baselog], > fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) > ship_strategy : FORWARD > Stage 3 : Operator > content : Calc(select=[zone_id]) > ship_strategy : FORWARD > the exceptions bellow : > /* 1 */ > /* 2 */ public class SinkConversion$4 extends > org.apache.flink.table.runtime.operators.TableStreamOperator > /* 3 */ implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > /* 4 */ > /* 5 */private final Object[] references; > /* 6 */private transient > org.apache.flink.table.data.util.DataFormatConverters.StringConverter > converter$3; > /* 7 */private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > /* 8 */ > /* 9 */public SinkConversion$4( > /* 10 */Object[] references, > /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task, > /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config, > /* 13 */org.apache.flink.streaming.api.operators.Output output, > /* 14 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception { > /* 15 */ this.references = references; > /* 16 */ converter$3 = > (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) > references[0])); > /* 17 */ this.setup(task, config, output); > /* 18 */ if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) { > /* 19 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) > /* 20 */ .setProcessingTimeService(processingTimeService); > /* 21 */ } > /* 22 */} > /* 23 */ > /* 24 */@Override > /* 25 */public void open() throws Exception { > /* 26 */ super.open(); > /* 27 */ > /* 28 */} > /* 29 */ > /* 30 */@Override > /* 31 */public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > /* 32 */ org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) element.getValue(); > /* 33 */ > /* 34 */ > /* 35 */ > /* 36 */ > /* 37 */ > /* 38 */ output.collect(outElement.replace((java.lang.String) > converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) > in1))); > /* 39 */ > /* 40 */} > /* 41 */ > /* 42 */ > /* 43 */ > /* 44 */@Override > /* 45 */public void close() throws Exception { > /* 46 */ super.close(); > /* 47
[jira] [Updated] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"
[ https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jack sun updated FLINK-18940: - Description: sourcecode is simple like this , there is nothing wrong with kafkaStream, if I replace $"zone_id" to another Interge type of column, It's find, but when a String column , boom .. val kafkaStream = env.addSource( new FlinkKafkaConsumer011[BaseLog]("base_log", new BaseLogDeserializationSchema(), properties) ) tenv.createTemporaryView("baselog",kafkaStream) tenv.from("baselog") .select($"zone_id") .toAppendStream[String] .print() the schema is a case class , like this case class BaseLog( var timestamp: Long, log_id: Int, fork_id: String , zone_id: String, plat_id: String, .. ) sql explain: == Abstract Syntax Tree == LogicalProject(zone_id=[$4]) +- LogicalTableScan(table=[[default_catalog, default_database, baselog]]) == Optimized Logical Plan == Calc(select=[zone_id]) +- DataStreamScan(table=[[default_catalog, default_database, baselog]], fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) == Physical Execution Plan == Stage 1 : Data Source content : Source: Custom Source Stage 2 : Operator content : SourceConversion(table=[default_catalog.default_database.baselog], fields=[timestamp, log_id, fork_id, zone_id, plat_id,.]) ship_strategy : FORWARD Stage 3 : Operator content : Calc(select=[zone_id]) ship_strategy : FORWARD the exceptions bellow : /* 1 */ /* 2 */ public class SinkConversion$4 extends org.apache.flink.table.runtime.operators.TableStreamOperator /* 3 */ implements org.apache.flink.streaming.api.operators.OneInputStreamOperator { /* 4 */ /* 5 */private final Object[] references; /* 6 */private transient org.apache.flink.table.data.util.DataFormatConverters.StringConverter converter$3; /* 7 */private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); /* 8 */ /* 9 */public SinkConversion$4( /* 10 */Object[] references, /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task, /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config, /* 13 */org.apache.flink.streaming.api.operators.Output output, /* 14 */ org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception { /* 15 */ this.references = references; /* 16 */ converter$3 = (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) references[0])); /* 17 */ this.setup(task, config, output); /* 18 */ if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) { /* 19 */ ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) /* 20 */ .setProcessingTimeService(processingTimeService); /* 21 */ } /* 22 */} /* 23 */ /* 24 */@Override /* 25 */public void open() throws Exception { /* 26 */ super.open(); /* 27 */ /* 28 */} /* 29 */ /* 30 */@Override /* 31 */public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { /* 32 */ org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue(); /* 33 */ /* 34 */ /* 35 */ /* 36 */ /* 37 */ /* 38 */ output.collect(outElement.replace((java.lang.String) converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) in1))); /* 39 */ /* 40 */} /* 41 */ /* 42 */ /* 43 */ /* 44 */@Override /* 45 */public void close() throws Exception { /* 46 */ super.close(); /* 47 */ /* 48 */} /* 49 */ /* 50 */ /* 51 */ } /* 52 */ 19:53:58,610 WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> SourceConversion(table=[default_catalog.default_database.baselog], fields=[timestamp, zone_id,..]) -> Calc(select=[zone_id]) -> SinkConversionToString -> Sink: Print to Std. Out (2/16) (8e0da69492bc45fffdbc17cb32c8f99b) switched from RUNNING to FAILED. java.lang.RuntimeException: Could not instantiate generated class 'SinkConversion$4' at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40) at
[jira] [Updated] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"
[ https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jack sun updated FLINK-18940: - Description: sourcecode is simple like this , there is nothing wrong with kafkaStream, if I replace $"zone_id" to another Interge type of column, It's find, but when a String column , boom .. val kafkaStream = env.addSource( new FlinkKafkaConsumer011[BaseLog]("base_log", new BaseLogDeserializationSchema(), properties) ) tenv.createTemporaryView("baselog",kafkaStream) tenv.from("baselog") .select($"zone_id") .toAppendStream[String] .print() the schema is a case class , like this case class BaseLog( var timestamp: Long, log_id: Int, fork_id: String , zone_id: String, plat_id: String, .. ) the exceptions bellow : /* 1 */ /* 2 */ public class SinkConversion$4 extends org.apache.flink.table.runtime.operators.TableStreamOperator /* 3 */ implements org.apache.flink.streaming.api.operators.OneInputStreamOperator { /* 4 */ /* 5 */private final Object[] references; /* 6 */private transient org.apache.flink.table.data.util.DataFormatConverters.StringConverter converter$3; /* 7 */private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); /* 8 */ /* 9 */public SinkConversion$4( /* 10 */Object[] references, /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task, /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config, /* 13 */org.apache.flink.streaming.api.operators.Output output, /* 14 */ org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception { /* 15 */ this.references = references; /* 16 */ converter$3 = (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) references[0])); /* 17 */ this.setup(task, config, output); /* 18 */ if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) { /* 19 */ ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) /* 20 */ .setProcessingTimeService(processingTimeService); /* 21 */ } /* 22 */} /* 23 */ /* 24 */@Override /* 25 */public void open() throws Exception { /* 26 */ super.open(); /* 27 */ /* 28 */} /* 29 */ /* 30 */@Override /* 31 */public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { /* 32 */ org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue(); /* 33 */ /* 34 */ /* 35 */ /* 36 */ /* 37 */ /* 38 */ output.collect(outElement.replace((java.lang.String) converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) in1))); /* 39 */ /* 40 */} /* 41 */ /* 42 */ /* 43 */ /* 44 */@Override /* 45 */public void close() throws Exception { /* 46 */ super.close(); /* 47 */ /* 48 */} /* 49 */ /* 50 */ /* 51 */ } /* 52 */ 19:53:58,610 WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> SourceConversion(table=[default_catalog.default_database.baselog], fields=[timestamp, zone_id,..]) -> Calc(select=[zone_id]) -> SinkConversionToString -> Sink: Print to Std. Out (2/16) (8e0da69492bc45fffdbc17cb32c8f99b) switched from RUNNING to FAILED. java.lang.RuntimeException: Could not instantiate generated class 'SinkConversion$4' at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40) at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:470) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) at
[jira] [Created] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"
jack sun created FLINK-18940: Summary: Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData" Key: FLINK-18940 URL: https://issues.apache.org/jira/browse/FLINK-18940 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.11.1 Environment: macOS Catalina java8 scala 2.11 Reporter: jack sun /* 1 */ /* 2 */ public class SinkConversion$4 extends org.apache.flink.table.runtime.operators.TableStreamOperator /* 3 */ implements org.apache.flink.streaming.api.operators.OneInputStreamOperator { /* 4 */ /* 5 */private final Object[] references; /* 6 */private transient org.apache.flink.table.data.util.DataFormatConverters.StringConverter converter$3; /* 7 */private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); /* 8 */ /* 9 */public SinkConversion$4( /* 10 */Object[] references, /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task, /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config, /* 13 */org.apache.flink.streaming.api.operators.Output output, /* 14 */ org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception { /* 15 */ this.references = references; /* 16 */ converter$3 = (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) references[0])); /* 17 */ this.setup(task, config, output); /* 18 */ if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) { /* 19 */ ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) /* 20 */ .setProcessingTimeService(processingTimeService); /* 21 */ } /* 22 */} /* 23 */ /* 24 */@Override /* 25 */public void open() throws Exception { /* 26 */ super.open(); /* 27 */ /* 28 */} /* 29 */ /* 30 */@Override /* 31 */public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { /* 32 */ org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue(); /* 33 */ /* 34 */ /* 35 */ /* 36 */ /* 37 */ /* 38 */ output.collect(outElement.replace((java.lang.String) converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) in1))); /* 39 */ /* 40 */} /* 41 */ /* 42 */ /* 43 */ /* 44 */@Override /* 45 */public void close() throws Exception { /* 46 */ super.close(); /* 47 */ /* 48 */} /* 49 */ /* 50 */ /* 51 */ } /* 52 */ 19:53:58,610 WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> SourceConversion(table=[default_catalog.default_database.baselog], fields=[timestamp, zone_id,..]) -> Calc(select=[zone_id]) -> SinkConversionToString -> Sink: Print to Std. Out (2/16) (8e0da69492bc45fffdbc17cb32c8f99b) switched from RUNNING to FAILED. java.lang.RuntimeException: Could not instantiate generated class 'SinkConversion$4' at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40) at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:470) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:155) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at