[jira] [Closed] (FLINK-20859) java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter

2021-01-05 Thread jack sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jack sun closed FLINK-20859.

Resolution: Fixed

需要引入

org.apache.parquet
parquet-avro
1.11.1


> java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter
> -
>
> Key: FLINK-20859
> URL: https://issues.apache.org/jira/browse/FLINK-20859
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.12.0
>Reporter: jack sun
>Priority: Major
> Attachments: 1.png, 2.png
>
>
> 按照flink 1.12 官方 StreamingFileSink 示例,发生运行错误
> java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter
>   at 
> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)
>   at 
> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
>   at 
> org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:35)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:226)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:207)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>   at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>   at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.parquet.avro.AvroParquetWriter
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>   at 

[jira] [Created] (FLINK-20859) java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter

2021-01-05 Thread jack sun (Jira)
jack sun created FLINK-20859:


 Summary: java.lang.NoClassDefFoundError: 
org/apache/parquet/avro/AvroParquetWriter
 Key: FLINK-20859
 URL: https://issues.apache.org/jira/browse/FLINK-20859
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.12.0
Reporter: jack sun
 Attachments: 1.png, 2.png

按照flink 1.12 官方 StreamingFileSink 示例,发生运行错误

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter
at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)
at 
org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
at 
org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:35)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:226)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:207)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:290)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:436)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Caused by: java.lang.ClassNotFoundException: 
org.apache.parquet.avro.AvroParquetWriter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19015) java.lang.RuntimeException: Could not instantiate generated class 'GroupAggsHandler$15'

2020-09-02 Thread jack sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17189097#comment-17189097
 ] 

jack sun commented on FLINK-19015:
--

[~gkgkgk]no , It will throw 'reassignment to val' error, if replace by 
[Int,class], works well

> java.lang.RuntimeException: Could not instantiate generated class 
> 'GroupAggsHandler$15'
> ---
>
> Key: FLINK-19015
> URL: https://issues.apache.org/jira/browse/FLINK-19015
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
>Reporter: jack sun
>Priority: Major
>
> source code :
> import org.apache.flink.api.common.eventtime.{Watermark, WatermarkGenerator, 
> WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.table.functions.AggregateFunction
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.api._
> import org.apache.flink.table.api.bridge.scala._
> import org.apache.flink.types.Row
> object TestAggFunction {
> def main(args: Array[String]) {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tenv = StreamTableEnvironment.create(env, 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build())
> env.setParallelism(1)
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> val socketStream = env.socketTextStream("127.0.0.1",9090)
> .map(x=>{
>   val c=x.split(" ").toList
>   Temp3(c(0),c(1).toInt,System.currentTimeMillis())
> })
> .assignTimestampsAndWatermarks(new WatermarkStrategy[Temp3] {
> override def createWatermarkGenerator(context: 
> WatermarkGeneratorSupplier.Context): WatermarkGenerator[Temp3] = {
> new WatermarkGenerator[Temp3] {
> val delay:Long = 0L//Time.seconds(10).toMilliseconds
> var maxTimestamp: Long = 0L
> 
> override def onEvent(t: Temp3, l: Long, 
> watermarkOutput: WatermarkOutput): Unit = {
> maxTimestamp = maxTimestamp.max(t.timestamp)
> val wm = new Watermark(maxTimestamp - delay)
> watermarkOutput.emitWatermark(wm)
> }
> 
> override def onPeriodicEmit(watermarkOutput: 
> WatermarkOutput): Unit = Nil
> }
> }
> })
> val table = 
> tenv.fromDataStream(socketStream,'role,'value,'pt.proctime)
> tenv.createTemporaryView("t1",table)
> tenv.registerFunction("testMax",new MaxAgg)
> tenv.sqlQuery("select role,testMax(`value`) from t1 group by 
> role").toRetractStream[Row].print()
> //tenv.sqlQuery("select * from t1").toRetractStream[Row].print()
> env.execute("test")
> }
> }
> case class Temp3(role:String,value:Int,timestamp:Long)
> class MaxAgg extends AggregateFunction[Int,Int]{
> override def getValue(acc: Int): Int = acc
> override def createAccumulator(): Int = 0
> def accumulate(acc: Int, rowVal:Int): Unit = acc.max(rowVal)
> }
> exceptions:
> 18:09:49,800 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils
>- Log file environment variable 'log.file' is not set.
> 18:09:49,800 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils
>- JobManager log files are unavailable in the web dashboard. Log file 
> location not found in environment variable 'log.file' or configuration key 
> 'web.log.path'.
> 18:09:50,204 WARN  org.apache.flink.metrics.MetricGroup   
>- The operator name 
> SourceConversion(table=[default_catalog.default_database.t1], fields=[role, 
> value, pt]) exceeded the 80 characters length limit and was truncated.
> /* 1 */
> /* 2 */public final class GroupAggsHandler$15 implements 
> org.apache.flink.table.runtime.generated.AggsHandleFunction {
> /* 3 */
> /* 4 */  private transient com.youyantech.streamJobs.MaxAgg 
> function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e;
> /* 5 */  org.apache.flink.table.data.GenericRowData acc$7 = new 
> org.apache.flink.table.data.GenericRowData(1);
> /* 6 */  org.apache.flink.table.data.GenericRowData acc$8 = new 
> org.apache.flink.table.data.GenericRowData(1);
> /* 7 */  private java.lang.Integer agg0_acc_internal;
> /* 8 */  private java.lang.Integer agg0_acc_external;
> /* 9 */  org.apache.flink.table.data.GenericRowData aggValue$14 = new 
> org.apache.flink.table.data.GenericRowData(1);
> /* 10 */
> /* 11 */  

[jira] [Commented] (FLINK-19015) java.lang.RuntimeException: Could not instantiate generated class 'GroupAggsHandler$15'

2020-08-23 Thread jack sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182930#comment-17182930
 ] 

jack sun commented on FLINK-19015:
--

[~jark]java sdk 1.8 ,scala 2.11 ,  macOS Catalina 10.15.6

> java.lang.RuntimeException: Could not instantiate generated class 
> 'GroupAggsHandler$15'
> ---
>
> Key: FLINK-19015
> URL: https://issues.apache.org/jira/browse/FLINK-19015
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
>Reporter: jack sun
>Priority: Major
>
> source code :
> import org.apache.flink.api.common.eventtime.{Watermark, WatermarkGenerator, 
> WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.table.functions.AggregateFunction
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.api._
> import org.apache.flink.table.api.bridge.scala._
> import org.apache.flink.types.Row
> object TestAggFunction {
> def main(args: Array[String]) {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tenv = StreamTableEnvironment.create(env, 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build())
> env.setParallelism(1)
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> val socketStream = env.socketTextStream("127.0.0.1",9090)
> .map(x=>{
>   val c=x.split(" ").toList
>   Temp3(c(0),c(1).toInt,System.currentTimeMillis())
> })
> .assignTimestampsAndWatermarks(new WatermarkStrategy[Temp3] {
> override def createWatermarkGenerator(context: 
> WatermarkGeneratorSupplier.Context): WatermarkGenerator[Temp3] = {
> new WatermarkGenerator[Temp3] {
> val delay:Long = 0L//Time.seconds(10).toMilliseconds
> var maxTimestamp: Long = 0L
> 
> override def onEvent(t: Temp3, l: Long, 
> watermarkOutput: WatermarkOutput): Unit = {
> maxTimestamp = maxTimestamp.max(t.timestamp)
> val wm = new Watermark(maxTimestamp - delay)
> watermarkOutput.emitWatermark(wm)
> }
> 
> override def onPeriodicEmit(watermarkOutput: 
> WatermarkOutput): Unit = Nil
> }
> }
> })
> val table = 
> tenv.fromDataStream(socketStream,'role,'value,'pt.proctime)
> tenv.createTemporaryView("t1",table)
> tenv.registerFunction("testMax",new MaxAgg)
> tenv.sqlQuery("select role,testMax(`value`) from t1 group by 
> role").toRetractStream[Row].print()
> //tenv.sqlQuery("select * from t1").toRetractStream[Row].print()
> env.execute("test")
> }
> }
> case class Temp3(role:String,value:Int,timestamp:Long)
> class MaxAgg extends AggregateFunction[Int,Int]{
> override def getValue(acc: Int): Int = acc
> override def createAccumulator(): Int = 0
> def accumulate(acc: Int, rowVal:Int): Unit = acc.max(rowVal)
> }
> exceptions:
> 18:09:49,800 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils
>- Log file environment variable 'log.file' is not set.
> 18:09:49,800 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils
>- JobManager log files are unavailable in the web dashboard. Log file 
> location not found in environment variable 'log.file' or configuration key 
> 'web.log.path'.
> 18:09:50,204 WARN  org.apache.flink.metrics.MetricGroup   
>- The operator name 
> SourceConversion(table=[default_catalog.default_database.t1], fields=[role, 
> value, pt]) exceeded the 80 characters length limit and was truncated.
> /* 1 */
> /* 2 */public final class GroupAggsHandler$15 implements 
> org.apache.flink.table.runtime.generated.AggsHandleFunction {
> /* 3 */
> /* 4 */  private transient com.youyantech.streamJobs.MaxAgg 
> function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e;
> /* 5 */  org.apache.flink.table.data.GenericRowData acc$7 = new 
> org.apache.flink.table.data.GenericRowData(1);
> /* 6 */  org.apache.flink.table.data.GenericRowData acc$8 = new 
> org.apache.flink.table.data.GenericRowData(1);
> /* 7 */  private java.lang.Integer agg0_acc_internal;
> /* 8 */  private java.lang.Integer agg0_acc_external;
> /* 9 */  org.apache.flink.table.data.GenericRowData aggValue$14 = new 
> org.apache.flink.table.data.GenericRowData(1);
> /* 10 */
> /* 11 */  private 
> 

[jira] [Updated] (FLINK-19015) java.lang.RuntimeException: Could not instantiate generated class 'GroupAggsHandler$15'

2020-08-21 Thread jack sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-19015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jack sun updated FLINK-19015:
-
Description: 
source code :
import org.apache.flink.api.common.eventtime.{Watermark, WatermarkGenerator, 
WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row

object TestAggFunction {

def main(args: Array[String]) {

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tenv = StreamTableEnvironment.create(env, 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build())

env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val socketStream = env.socketTextStream("127.0.0.1",9090)
.map(x=>{
  val c=x.split(" ").toList
  Temp3(c(0),c(1).toInt,System.currentTimeMillis())
})
.assignTimestampsAndWatermarks(new WatermarkStrategy[Temp3] {
override def createWatermarkGenerator(context: 
WatermarkGeneratorSupplier.Context): WatermarkGenerator[Temp3] = {
new WatermarkGenerator[Temp3] {
val delay:Long = 0L//Time.seconds(10).toMilliseconds
var maxTimestamp: Long = 0L

override def onEvent(t: Temp3, l: Long, 
watermarkOutput: WatermarkOutput): Unit = {
maxTimestamp = maxTimestamp.max(t.timestamp)

val wm = new Watermark(maxTimestamp - delay)

watermarkOutput.emitWatermark(wm)
}

override def onPeriodicEmit(watermarkOutput: 
WatermarkOutput): Unit = Nil
}
}
})

val table = tenv.fromDataStream(socketStream,'role,'value,'pt.proctime)

tenv.createTemporaryView("t1",table)

tenv.registerFunction("testMax",new MaxAgg)

tenv.sqlQuery("select role,testMax(`value`) from t1 group by 
role").toRetractStream[Row].print()
//tenv.sqlQuery("select * from t1").toRetractStream[Row].print()

env.execute("test")
}
}

case class Temp3(role:String,value:Int,timestamp:Long)

class MaxAgg extends AggregateFunction[Int,Int]{

override def getValue(acc: Int): Int = acc

override def createAccumulator(): Int = 0

def accumulate(acc: Int, rowVal:Int): Unit = acc.max(rowVal)
}






exceptions:



18:09:49,800 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils  
 - Log file environment variable 'log.file' is not set.
18:09:49,800 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils  
 - JobManager log files are unavailable in the web dashboard. Log file location 
not found in environment variable 'log.file' or configuration key 
'web.log.path'.
18:09:50,204 WARN  org.apache.flink.metrics.MetricGroup 
 - The operator name 
SourceConversion(table=[default_catalog.default_database.t1], fields=[role, 
value, pt]) exceeded the 80 characters length limit and was truncated.
/* 1 */
/* 2 */public final class GroupAggsHandler$15 implements 
org.apache.flink.table.runtime.generated.AggsHandleFunction {
/* 3 */
/* 4 */  private transient com.youyantech.streamJobs.MaxAgg 
function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e;
/* 5 */  org.apache.flink.table.data.GenericRowData acc$7 = new 
org.apache.flink.table.data.GenericRowData(1);
/* 6 */  org.apache.flink.table.data.GenericRowData acc$8 = new 
org.apache.flink.table.data.GenericRowData(1);
/* 7 */  private java.lang.Integer agg0_acc_internal;
/* 8 */  private java.lang.Integer agg0_acc_external;
/* 9 */  org.apache.flink.table.data.GenericRowData aggValue$14 = new 
org.apache.flink.table.data.GenericRowData(1);
/* 10 */
/* 11 */  private 
org.apache.flink.table.runtime.dataview.StateDataViewStore store;
/* 12 */
/* 13 */  public GroupAggsHandler$15(java.lang.Object[] references) 
throws Exception {
/* 14 */
function_com$youyantech$streamJobs$MaxAgg$b5e566eecccd9c2ccca51abaa3afee5e = 
(((com.youyantech.streamJobs.MaxAgg) references[0]));
/* 15 */  }
/* 16 */
/* 17 */  private org.apache.flink.api.common.functions.RuntimeContext 
getRuntimeContext() {
/* 18 */return store.getRuntimeContext();
/* 19 */  }
/* 20 */
/* 21 */  @Override
/* 22 */  public void 
open(org.apache.flink.table.runtime.dataview.StateDataViewStore store) throws 
Exception {
/* 23 */this.store = store;
/* 24 */

[jira] [Created] (FLINK-19015) java.lang.RuntimeException: Could not instantiate generated class 'GroupAggsHandler$15'

2020-08-21 Thread jack sun (Jira)
jack sun created FLINK-19015:


 Summary: java.lang.RuntimeException: Could not instantiate 
generated class 'GroupAggsHandler$15'
 Key: FLINK-19015
 URL: https://issues.apache.org/jira/browse/FLINK-19015
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.11.1
Reporter: jack sun


source code :
import org.apache.flink.api.common.eventtime.{Watermark, WatermarkGenerator, 
WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row

object TestAggFunction {

def main(args: Array[String]) {

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tenv = StreamTableEnvironment.create(env, 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build())

env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val socketStream = env.socketTextStream("127.0.0.1",9090)
.map(x=>{
  val c=x.split(" ").toList
  Temp3(c(0),c(1).toInt,System.currentTimeMillis())
})
.assignTimestampsAndWatermarks(new WatermarkStrategy[Temp3] {
override def createWatermarkGenerator(context: 
WatermarkGeneratorSupplier.Context): WatermarkGenerator[Temp3] = {
new WatermarkGenerator[Temp3] {
val delay:Long = 0L//Time.seconds(10).toMilliseconds
var maxTimestamp: Long = 0L

override def onEvent(t: Temp3, l: Long, 
watermarkOutput: WatermarkOutput): Unit = {
maxTimestamp = maxTimestamp.max(t.timestamp)

val wm = new Watermark(maxTimestamp - delay)

watermarkOutput.emitWatermark(wm)
}

override def onPeriodicEmit(watermarkOutput: 
WatermarkOutput): Unit = Nil
}
}
})

val table = tenv.fromDataStream(socketStream,'role,'value,'pt.proctime)

tenv.createTemporaryView("t1",table)

tenv.registerFunction("testMax",new MaxAgg)

tenv.sqlQuery("select role,testMax(`value`) from t1 group by 
role").toRetractStream[Row].print()
//tenv.sqlQuery("select * from t1").toRetractStream[Row].print()

env.execute("test")
}
}

case class Temp3(role:String,value:Int,timestamp:Long)

class MaxAgg extends AggregateFunction[Int,Int]{

override def getValue(acc: Int): Int = acc

override def createAccumulator(): Int = 0

def accumulate(acc: Int, rowVal:Int): Unit = acc.max(rowVal)
}






exceptions:



/Library/Java/JavaVirtualMachines/jdk1.8.0_231.jdk/Contents/Home/bin/java 
-javaagent:/Applications/IntelliJ IDEA 
CE.app/Contents/lib/idea_rt.jar=57789:/Applications/IntelliJ IDEA 
CE.app/Contents/bin -Dfile.encoding=UTF-8 -classpath 

[jira] [Updated] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"

2020-08-13 Thread jack sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jack sun updated FLINK-18940:
-
Attachment: 2.png

> Cannot cast "org.apache.flink.table.data.RowData" to 
> "org.apache.flink.table.data.binary.BinaryStringData"
> --
>
> Key: FLINK-18940
> URL: https://issues.apache.org/jira/browse/FLINK-18940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
> Environment: macOS Catalina
> java8
> scala 2.11
>Reporter: jack sun
>Priority: Major
> Attachments: 1.png, 2.png
>
>
> sourcecode is simple like this , there is nothing wrong with kafkaStream, if 
> I replace $"zone_id" to another Interge type of column, It's find,
> but when a String column , boom ..
> val kafkaStream = env.addSource(
> new FlinkKafkaConsumer011[BaseLog]("base_log", new 
> BaseLogDeserializationSchema(), properties)
> )
> tenv.createTemporaryView("baselog",kafkaStream)
> tenv.from("baselog")
> .select($"zone_id")
> .toAppendStream[String]
> .print()
> the schema is a case class , like this
> case class BaseLog(
> var timestamp: Long, 
> log_id: Int,
> fork_id: String ,
> zone_id: String,
> plat_id: String,
> ..
> )
> sql explain:
> == Abstract Syntax Tree ==
> LogicalProject(zone_id=[$4])
> +- LogicalTableScan(table=[[default_catalog, default_database, baselog]])
> == Optimized Logical Plan ==
> Calc(select=[zone_id])
> +- DataStreamScan(table=[[default_catalog, default_database, baselog]], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
> == Physical Execution Plan ==
> Stage 1 : Data Source
>   content : Source: Custom Source
>   Stage 2 : Operator
>   content : 
> SourceConversion(table=[default_catalog.default_database.baselog], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
>   ship_strategy : FORWARD
>   Stage 3 : Operator
>   content : Calc(select=[zone_id])
>   ship_strategy : FORWARD
> the exceptions bellow : 
> /* 1 */
> /* 2 */  public class SinkConversion$4 extends 
> org.apache.flink.table.runtime.operators.TableStreamOperator
> /* 3 */  implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */private final Object[] references;
> /* 6 */private transient 
> org.apache.flink.table.data.util.DataFormatConverters.StringConverter 
> converter$3;
> /* 7 */private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 8 */
> /* 9 */public SinkConversion$4(
> /* 10 */Object[] references,
> /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 13 */org.apache.flink.streaming.api.operators.Output output,
> /* 14 */
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {
> /* 15 */  this.references = references;
> /* 16 */  converter$3 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) 
> references[0]));
> /* 17 */  this.setup(task, config, output);
> /* 18 */  if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> /* 19 */
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> /* 20 */  .setProcessingTimeService(processingTimeService);
> /* 21 */  }
> /* 22 */}
> /* 23 */
> /* 24 */@Override
> /* 25 */public void open() throws Exception {
> /* 26 */  super.open();
> /* 27 */  
> /* 28 */}
> /* 29 */
> /* 30 */@Override
> /* 31 */public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {
> /* 32 */  org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) element.getValue();
> /* 33 */  
> /* 34 */  
> /* 35 */  
> /* 36 */  
> /* 37 */  
> /* 38 */  output.collect(outElement.replace((java.lang.String) 
> converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) 
> in1)));
> /* 39 */  
> /* 40 */}
> /* 41 */
> /* 42 */
> /* 43 */
> /* 44 */@Override
> /* 45 */public void close() throws Exception {
> /* 46 */   super.close();
> /* 47 */  
> /* 48 */  

[jira] [Updated] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"

2020-08-13 Thread jack sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jack sun updated FLINK-18940:
-
Attachment: 1.png

> Cannot cast "org.apache.flink.table.data.RowData" to 
> "org.apache.flink.table.data.binary.BinaryStringData"
> --
>
> Key: FLINK-18940
> URL: https://issues.apache.org/jira/browse/FLINK-18940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
> Environment: macOS Catalina
> java8
> scala 2.11
>Reporter: jack sun
>Priority: Major
> Attachments: 1.png, 2.png
>
>
> sourcecode is simple like this , there is nothing wrong with kafkaStream, if 
> I replace $"zone_id" to another Interge type of column, It's find,
> but when a String column , boom ..
> val kafkaStream = env.addSource(
> new FlinkKafkaConsumer011[BaseLog]("base_log", new 
> BaseLogDeserializationSchema(), properties)
> )
> tenv.createTemporaryView("baselog",kafkaStream)
> tenv.from("baselog")
> .select($"zone_id")
> .toAppendStream[String]
> .print()
> the schema is a case class , like this
> case class BaseLog(
> var timestamp: Long, 
> log_id: Int,
> fork_id: String ,
> zone_id: String,
> plat_id: String,
> ..
> )
> sql explain:
> == Abstract Syntax Tree ==
> LogicalProject(zone_id=[$4])
> +- LogicalTableScan(table=[[default_catalog, default_database, baselog]])
> == Optimized Logical Plan ==
> Calc(select=[zone_id])
> +- DataStreamScan(table=[[default_catalog, default_database, baselog]], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
> == Physical Execution Plan ==
> Stage 1 : Data Source
>   content : Source: Custom Source
>   Stage 2 : Operator
>   content : 
> SourceConversion(table=[default_catalog.default_database.baselog], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
>   ship_strategy : FORWARD
>   Stage 3 : Operator
>   content : Calc(select=[zone_id])
>   ship_strategy : FORWARD
> the exceptions bellow : 
> /* 1 */
> /* 2 */  public class SinkConversion$4 extends 
> org.apache.flink.table.runtime.operators.TableStreamOperator
> /* 3 */  implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */private final Object[] references;
> /* 6 */private transient 
> org.apache.flink.table.data.util.DataFormatConverters.StringConverter 
> converter$3;
> /* 7 */private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 8 */
> /* 9 */public SinkConversion$4(
> /* 10 */Object[] references,
> /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 13 */org.apache.flink.streaming.api.operators.Output output,
> /* 14 */
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {
> /* 15 */  this.references = references;
> /* 16 */  converter$3 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) 
> references[0]));
> /* 17 */  this.setup(task, config, output);
> /* 18 */  if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> /* 19 */
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> /* 20 */  .setProcessingTimeService(processingTimeService);
> /* 21 */  }
> /* 22 */}
> /* 23 */
> /* 24 */@Override
> /* 25 */public void open() throws Exception {
> /* 26 */  super.open();
> /* 27 */  
> /* 28 */}
> /* 29 */
> /* 30 */@Override
> /* 31 */public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {
> /* 32 */  org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) element.getValue();
> /* 33 */  
> /* 34 */  
> /* 35 */  
> /* 36 */  
> /* 37 */  
> /* 38 */  output.collect(outElement.replace((java.lang.String) 
> converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) 
> in1)));
> /* 39 */  
> /* 40 */}
> /* 41 */
> /* 42 */
> /* 43 */
> /* 44 */@Override
> /* 45 */public void close() throws Exception {
> /* 46 */   super.close();
> /* 47 */  
> /* 48 */  

[jira] [Commented] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"

2020-08-13 Thread jack sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177435#comment-17177435
 ] 

jack sun commented on FLINK-18940:
--

[~twalthr] yep , I append it to the description

> Cannot cast "org.apache.flink.table.data.RowData" to 
> "org.apache.flink.table.data.binary.BinaryStringData"
> --
>
> Key: FLINK-18940
> URL: https://issues.apache.org/jira/browse/FLINK-18940
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.11.1
> Environment: macOS Catalina
> java8
> scala 2.11
>Reporter: jack sun
>Priority: Major
>
> sourcecode is simple like this , there is nothing wrong with kafkaStream, if 
> I replace $"zone_id" to another Interge type of column, It's find,
> but when a String column , boom ..
> val kafkaStream = env.addSource(
> new FlinkKafkaConsumer011[BaseLog]("base_log", new 
> BaseLogDeserializationSchema(), properties)
> )
> tenv.createTemporaryView("baselog",kafkaStream)
> tenv.from("baselog")
> .select($"zone_id")
> .toAppendStream[String]
> .print()
> the schema is a case class , like this
> case class BaseLog(
> var timestamp: Long, 
> log_id: Int,
> fork_id: String ,
> zone_id: String,
> plat_id: String,
> ..
> )
> sql explain:
> == Abstract Syntax Tree ==
> LogicalProject(zone_id=[$4])
> +- LogicalTableScan(table=[[default_catalog, default_database, baselog]])
> == Optimized Logical Plan ==
> Calc(select=[zone_id])
> +- DataStreamScan(table=[[default_catalog, default_database, baselog]], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
> == Physical Execution Plan ==
> Stage 1 : Data Source
>   content : Source: Custom Source
>   Stage 2 : Operator
>   content : 
> SourceConversion(table=[default_catalog.default_database.baselog], 
> fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
>   ship_strategy : FORWARD
>   Stage 3 : Operator
>   content : Calc(select=[zone_id])
>   ship_strategy : FORWARD
> the exceptions bellow : 
> /* 1 */
> /* 2 */  public class SinkConversion$4 extends 
> org.apache.flink.table.runtime.operators.TableStreamOperator
> /* 3 */  implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */private final Object[] references;
> /* 6 */private transient 
> org.apache.flink.table.data.util.DataFormatConverters.StringConverter 
> converter$3;
> /* 7 */private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 8 */
> /* 9 */public SinkConversion$4(
> /* 10 */Object[] references,
> /* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 12 */org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 13 */org.apache.flink.streaming.api.operators.Output output,
> /* 14 */
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
> processingTimeService) throws Exception {
> /* 15 */  this.references = references;
> /* 16 */  converter$3 = 
> (((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) 
> references[0]));
> /* 17 */  this.setup(task, config, output);
> /* 18 */  if (this instanceof 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> /* 19 */
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> /* 20 */  .setProcessingTimeService(processingTimeService);
> /* 21 */  }
> /* 22 */}
> /* 23 */
> /* 24 */@Override
> /* 25 */public void open() throws Exception {
> /* 26 */  super.open();
> /* 27 */  
> /* 28 */}
> /* 29 */
> /* 30 */@Override
> /* 31 */public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {
> /* 32 */  org.apache.flink.table.data.RowData in1 = 
> (org.apache.flink.table.data.RowData) element.getValue();
> /* 33 */  
> /* 34 */  
> /* 35 */  
> /* 36 */  
> /* 37 */  
> /* 38 */  output.collect(outElement.replace((java.lang.String) 
> converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) 
> in1)));
> /* 39 */  
> /* 40 */}
> /* 41 */
> /* 42 */
> /* 43 */
> /* 44 */@Override
> /* 45 */public void close() throws Exception {
> /* 46 */   super.close();
> /* 47 

[jira] [Updated] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"

2020-08-13 Thread jack sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jack sun updated FLINK-18940:
-
Description: 
sourcecode is simple like this , there is nothing wrong with kafkaStream, if I 
replace $"zone_id" to another Interge type of column, It's find,
but when a String column , boom ..

val kafkaStream = env.addSource(
new FlinkKafkaConsumer011[BaseLog]("base_log", new 
BaseLogDeserializationSchema(), properties)
)

tenv.createTemporaryView("baselog",kafkaStream)

tenv.from("baselog")
.select($"zone_id")
.toAppendStream[String]
.print()


the schema is a case class , like this

case class BaseLog(
var timestamp: Long, 
log_id: Int,
fork_id: String ,
zone_id: String,
plat_id: String,
..
)

sql explain:

== Abstract Syntax Tree ==
LogicalProject(zone_id=[$4])
+- LogicalTableScan(table=[[default_catalog, default_database, baselog]])

== Optimized Logical Plan ==
Calc(select=[zone_id])
+- DataStreamScan(table=[[default_catalog, default_database, baselog]], 
fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])

== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: Custom Source

Stage 2 : Operator
content : 
SourceConversion(table=[default_catalog.default_database.baselog], 
fields=[timestamp, log_id, fork_id, zone_id, plat_id,.])
ship_strategy : FORWARD

Stage 3 : Operator
content : Calc(select=[zone_id])
ship_strategy : FORWARD

the exceptions bellow : 


/* 1 */
/* 2 */  public class SinkConversion$4 extends 
org.apache.flink.table.runtime.operators.TableStreamOperator
/* 3 */  implements 
org.apache.flink.streaming.api.operators.OneInputStreamOperator {
/* 4 */
/* 5 */private final Object[] references;
/* 6 */private transient 
org.apache.flink.table.data.util.DataFormatConverters.StringConverter 
converter$3;
/* 7 */private final 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
/* 8 */
/* 9 */public SinkConversion$4(
/* 10 */Object[] references,
/* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task,
/* 12 */org.apache.flink.streaming.api.graph.StreamConfig config,
/* 13 */org.apache.flink.streaming.api.operators.Output output,
/* 14 */
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
processingTimeService) throws Exception {
/* 15 */  this.references = references;
/* 16 */  converter$3 = 
(((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) 
references[0]));
/* 17 */  this.setup(task, config, output);
/* 18 */  if (this instanceof 
org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
/* 19 */
((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
/* 20 */  .setProcessingTimeService(processingTimeService);
/* 21 */  }
/* 22 */}
/* 23 */
/* 24 */@Override
/* 25 */public void open() throws Exception {
/* 26 */  super.open();
/* 27 */  
/* 28 */}
/* 29 */
/* 30 */@Override
/* 31 */public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element) throws Exception {
/* 32 */  org.apache.flink.table.data.RowData in1 = 
(org.apache.flink.table.data.RowData) element.getValue();
/* 33 */  
/* 34 */  
/* 35 */  
/* 36 */  
/* 37 */  
/* 38 */  output.collect(outElement.replace((java.lang.String) 
converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) 
in1)));
/* 39 */  
/* 40 */}
/* 41 */
/* 42 */
/* 43 */
/* 44 */@Override
/* 45 */public void close() throws Exception {
/* 46 */   super.close();
/* 47 */  
/* 48 */}
/* 49 */
/* 50 */
/* 51 */  }
/* 52 */

19:53:58,610 WARN  org.apache.flink.runtime.taskmanager.Task
 - Source: Custom Source -> 
SourceConversion(table=[default_catalog.default_database.baselog], 
fields=[timestamp, zone_id,..]) -> Calc(select=[zone_id]) -> 
SinkConversionToString -> Sink: Print to Std. Out (2/16) 
(8e0da69492bc45fffdbc17cb32c8f99b) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not instantiate generated class 
'SinkConversion$4'
at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
at 

[jira] [Updated] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"

2020-08-13 Thread jack sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jack sun updated FLINK-18940:
-
Description: 
sourcecode is simple like this , there is nothing wrong with kafkaStream, if I 
replace $"zone_id" to another Interge type of column, It's find,
but when a String column , boom ..

val kafkaStream = env.addSource(
new FlinkKafkaConsumer011[BaseLog]("base_log", new 
BaseLogDeserializationSchema(), properties)
)

tenv.createTemporaryView("baselog",kafkaStream)

tenv.from("baselog")
.select($"zone_id")
.toAppendStream[String]
.print()


the schema is a case class , like this

case class BaseLog(
var timestamp: Long, 
log_id: Int,
fork_id: String ,
zone_id: String,
plat_id: String,
..
)


the exceptions bellow : 


/* 1 */
/* 2 */  public class SinkConversion$4 extends 
org.apache.flink.table.runtime.operators.TableStreamOperator
/* 3 */  implements 
org.apache.flink.streaming.api.operators.OneInputStreamOperator {
/* 4 */
/* 5 */private final Object[] references;
/* 6 */private transient 
org.apache.flink.table.data.util.DataFormatConverters.StringConverter 
converter$3;
/* 7 */private final 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
/* 8 */
/* 9 */public SinkConversion$4(
/* 10 */Object[] references,
/* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task,
/* 12 */org.apache.flink.streaming.api.graph.StreamConfig config,
/* 13 */org.apache.flink.streaming.api.operators.Output output,
/* 14 */
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
processingTimeService) throws Exception {
/* 15 */  this.references = references;
/* 16 */  converter$3 = 
(((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) 
references[0]));
/* 17 */  this.setup(task, config, output);
/* 18 */  if (this instanceof 
org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
/* 19 */
((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
/* 20 */  .setProcessingTimeService(processingTimeService);
/* 21 */  }
/* 22 */}
/* 23 */
/* 24 */@Override
/* 25 */public void open() throws Exception {
/* 26 */  super.open();
/* 27 */  
/* 28 */}
/* 29 */
/* 30 */@Override
/* 31 */public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element) throws Exception {
/* 32 */  org.apache.flink.table.data.RowData in1 = 
(org.apache.flink.table.data.RowData) element.getValue();
/* 33 */  
/* 34 */  
/* 35 */  
/* 36 */  
/* 37 */  
/* 38 */  output.collect(outElement.replace((java.lang.String) 
converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) 
in1)));
/* 39 */  
/* 40 */}
/* 41 */
/* 42 */
/* 43 */
/* 44 */@Override
/* 45 */public void close() throws Exception {
/* 46 */   super.close();
/* 47 */  
/* 48 */}
/* 49 */
/* 50 */
/* 51 */  }
/* 52 */

19:53:58,610 WARN  org.apache.flink.runtime.taskmanager.Task
 - Source: Custom Source -> 
SourceConversion(table=[default_catalog.default_database.baselog], 
fields=[timestamp, zone_id,..]) -> Calc(select=[zone_id]) -> 
SinkConversionToString -> Sink: Print to Std. Out (2/16) 
(8e0da69492bc45fffdbc17cb32c8f99b) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not instantiate generated class 
'SinkConversion$4'
at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
at 
org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:470)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at 

[jira] [Created] (FLINK-18940) Cannot cast "org.apache.flink.table.data.RowData" to "org.apache.flink.table.data.binary.BinaryStringData"

2020-08-13 Thread jack sun (Jira)
jack sun created FLINK-18940:


 Summary: Cannot cast "org.apache.flink.table.data.RowData" to 
"org.apache.flink.table.data.binary.BinaryStringData"
 Key: FLINK-18940
 URL: https://issues.apache.org/jira/browse/FLINK-18940
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.11.1
 Environment: macOS Catalina
java8
scala 2.11
Reporter: jack sun


/* 1 */
/* 2 */  public class SinkConversion$4 extends 
org.apache.flink.table.runtime.operators.TableStreamOperator
/* 3 */  implements 
org.apache.flink.streaming.api.operators.OneInputStreamOperator {
/* 4 */
/* 5 */private final Object[] references;
/* 6 */private transient 
org.apache.flink.table.data.util.DataFormatConverters.StringConverter 
converter$3;
/* 7 */private final 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
/* 8 */
/* 9 */public SinkConversion$4(
/* 10 */Object[] references,
/* 11 */org.apache.flink.streaming.runtime.tasks.StreamTask task,
/* 12 */org.apache.flink.streaming.api.graph.StreamConfig config,
/* 13 */org.apache.flink.streaming.api.operators.Output output,
/* 14 */
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
processingTimeService) throws Exception {
/* 15 */  this.references = references;
/* 16 */  converter$3 = 
(((org.apache.flink.table.data.util.DataFormatConverters.StringConverter) 
references[0]));
/* 17 */  this.setup(task, config, output);
/* 18 */  if (this instanceof 
org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
/* 19 */
((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
/* 20 */  .setProcessingTimeService(processingTimeService);
/* 21 */  }
/* 22 */}
/* 23 */
/* 24 */@Override
/* 25 */public void open() throws Exception {
/* 26 */  super.open();
/* 27 */  
/* 28 */}
/* 29 */
/* 30 */@Override
/* 31 */public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element) throws Exception {
/* 32 */  org.apache.flink.table.data.RowData in1 = 
(org.apache.flink.table.data.RowData) element.getValue();
/* 33 */  
/* 34 */  
/* 35 */  
/* 36 */  
/* 37 */  
/* 38 */  output.collect(outElement.replace((java.lang.String) 
converter$3.toExternal((org.apache.flink.table.data.binary.BinaryStringData) 
in1)));
/* 39 */  
/* 40 */}
/* 41 */
/* 42 */
/* 43 */
/* 44 */@Override
/* 45 */public void close() throws Exception {
/* 46 */   super.close();
/* 47 */  
/* 48 */}
/* 49 */
/* 50 */
/* 51 */  }
/* 52 */

19:53:58,610 WARN  org.apache.flink.runtime.taskmanager.Task
 - Source: Custom Source -> 
SourceConversion(table=[default_catalog.default_database.baselog], 
fields=[timestamp, zone_id,..]) -> Calc(select=[zone_id]) -> 
SinkConversionToString -> Sink: Print to Std. Out (2/16) 
(8e0da69492bc45fffdbc17cb32c8f99b) switched from RUNNING to FAILED.
java.lang.RuntimeException: Could not instantiate generated class 
'SinkConversion$4'
at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
at 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
at 
org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:470)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:155)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at