[jira] [Created] (FLINK-18868) Converting data into JSON format with flinksql
mzz created FLINK-18868: --- Summary: Converting data into JSON format with flinksql Key: FLINK-18868 URL: https://issues.apache.org/jira/browse/FLINK-18868 Project: Flink Issue Type: Bug Affects Versions: 1.10.0 Reporter: mzz Converting data into JSON format with flinksql。 like as Spark‘s Function : toJson -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-18741) ProcessWindowFunction's process function exception
[ https://issues.apache.org/jira/browse/FLINK-18741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171876#comment-17171876 ] mzz edited comment on FLINK-18741 at 8/6/20, 2:53 AM: -- [~aljoscha] thx for your reply. it doesn't throwing any exception . When I use ProcessWindowFunction , override process function, I traverse the iterator in this function, the values outside are reset to their initial values。 the iterator is elements,define a variable in the process function and assign the initial value. Then, the iterator is traversed to give the variable + 1. However, when the window ends, the value of the variable will be reset to the initial value. *code:* {code:java} elements.foreach(e => { if ("adreq".equals(e._3)) { requestNum += 1 println(key._1, requestNum) // The values printed here like : //(key,1) //(key,2) //(key,3) } }) //But print outside the for loop always like : //(key,0) println(requestNum, key._1) {code} was (Author: mzz_q): [~aljoscha] thx for your reply. When I use ProcessWindowFunction , override process function, I traverse the iterator in this function, the values outside are reset to their initial values。 the iterator is elements,define a variable in the process function and assign the initial value. Then, the iterator is traversed to give the variable + 1. However, when the window ends, the value of the variable will be reset to the initial value. *code:* {code:java} elements.foreach(e => { if ("adreq".equals(e._3)) { requestNum += 1 println(key._1, requestNum) // The values printed here like : //(key,1) //(key,2) //(key,3) } }) //But print outside the for loop always like : //(key,0) println(requestNum, key._1) {code} > ProcessWindowFunction's process function exception > --- > > Key: FLINK-18741 > URL: https://issues.apache.org/jira/browse/FLINK-18741 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > I use ProcessWindowFunction to achieve PV calculation, but when rewriting > process, the user-defined state value cannot be returned。 > code: > {code:java} > tem.keyBy(x => > (x._1, x._2, x._4, x._5, x._6, x._7, x._8)) > .timeWindow(Time.seconds(15 * 60)) //15 min window > .process(new ProcessWindowFunction[(String, String, String, String, > String, String, String, String, String), CkResult, (String, String, String, > String, String, String, String), TimeWindow] { > var clickCount: ValueState[Long] = _ > * var requestCount: ValueState[Long] = _ > * var returnCount: ValueState[Long] = _ > var videoCount: ValueState[Long] = _ > var noVideoCount: ValueState[Long] = _ > override def open(parameters: Configuration): Unit = { > clickCount = getRuntimeContext.getState(new > ValueStateDescriptor("clickCount", classOf[Long])) >* requestCount = getRuntimeContext.getState(new > ValueStateDescriptor("requestCount", classOf[Long]))* > returnCount = getRuntimeContext.getState(new > ValueStateDescriptor("returnCount", classOf[Long])) > videoCount = getRuntimeContext.getState(new > ValueStateDescriptor("videoCount", classOf[Long])) > noVideoCount = getRuntimeContext.getState(new > ValueStateDescriptor("noVideoCount", classOf[Long])) > } > override def process(key: (String, String, String, String, String, > String, String), context: Context, elements: Iterable[(String, String, > String, String, String, String, String, String, String)], out: > Collector[CkResult]) = { > try { > var clickNum: Long = clickCount.value > val dateNow = > LocalDateTime.now().format(DateTimeFormatter.ofPattern("MMdd")).toLong > var requestNum: Long = requestCount.value > var returnNum: Long = returnCount.value > var videoNum: Long = videoCount.value > var noVideoNum: Long = noVideoCount.value > if (requestNum == null) { > requestNum = 0 > } > > val ecpm = key._7.toDouble.formatted("%.2f").toFloat > val created_at = getSecondTimestampTwo(new Date) > > * elements.foreach(e => { > if ("adreq".equals(e._3)) { > requestNum += 1 > println(key._1, requestNum) > } > }) > requestCount.update(requestNum) > println(requestNum, key._1)* > > out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * > 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, > key._6,
[jira] [Comment Edited] (FLINK-18741) ProcessWindowFunction's process function exception
[ https://issues.apache.org/jira/browse/FLINK-18741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171876#comment-17171876 ] mzz edited comment on FLINK-18741 at 8/6/20, 2:52 AM: -- [~aljoscha] thx for your reply. When I use ProcessWindowFunction , override process function, I traverse the iterator in this function, the values outside are reset to their initial values。 the iterator is elements,define a variable in the process function and assign the initial value. Then, the iterator is traversed to give the variable + 1. However, when the window ends, the value of the variable will be reset to the initial value. *code:* {code:java} elements.foreach(e => { if ("adreq".equals(e._3)) { requestNum += 1 println(key._1, requestNum) // The values printed here like : //(key,1) //(key,2) //(key,3) } }) //But print outside the for loop always like : //(key,0) println(requestNum, key._1) {code} was (Author: mzz_q): When I use ProcessWindowFunction , override process function, I traverse the iterator in this function, the values outside are reset to their initial values。 the iterator is elements,define a variable in the process function and assign the initial value. Then, the iterator is traversed to give the variable + 1. However, when the window ends, the value of the variable will be reset to the initial value. *code:* {code:java} elements.foreach(e => { if ("adreq".equals(e._3)) { requestNum += 1 println(key._1, requestNum) // The values printed here like : //(key,1) //(key,2) //(key,3) } }) //But print outside the for loop always like : //(key,0) println(requestNum, key._1) {code} > ProcessWindowFunction's process function exception > --- > > Key: FLINK-18741 > URL: https://issues.apache.org/jira/browse/FLINK-18741 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > I use ProcessWindowFunction to achieve PV calculation, but when rewriting > process, the user-defined state value cannot be returned。 > code: > {code:java} > tem.keyBy(x => > (x._1, x._2, x._4, x._5, x._6, x._7, x._8)) > .timeWindow(Time.seconds(15 * 60)) //15 min window > .process(new ProcessWindowFunction[(String, String, String, String, > String, String, String, String, String), CkResult, (String, String, String, > String, String, String, String), TimeWindow] { > var clickCount: ValueState[Long] = _ > * var requestCount: ValueState[Long] = _ > * var returnCount: ValueState[Long] = _ > var videoCount: ValueState[Long] = _ > var noVideoCount: ValueState[Long] = _ > override def open(parameters: Configuration): Unit = { > clickCount = getRuntimeContext.getState(new > ValueStateDescriptor("clickCount", classOf[Long])) >* requestCount = getRuntimeContext.getState(new > ValueStateDescriptor("requestCount", classOf[Long]))* > returnCount = getRuntimeContext.getState(new > ValueStateDescriptor("returnCount", classOf[Long])) > videoCount = getRuntimeContext.getState(new > ValueStateDescriptor("videoCount", classOf[Long])) > noVideoCount = getRuntimeContext.getState(new > ValueStateDescriptor("noVideoCount", classOf[Long])) > } > override def process(key: (String, String, String, String, String, > String, String), context: Context, elements: Iterable[(String, String, > String, String, String, String, String, String, String)], out: > Collector[CkResult]) = { > try { > var clickNum: Long = clickCount.value > val dateNow = > LocalDateTime.now().format(DateTimeFormatter.ofPattern("MMdd")).toLong > var requestNum: Long = requestCount.value > var returnNum: Long = returnCount.value > var videoNum: Long = videoCount.value > var noVideoNum: Long = noVideoCount.value > if (requestNum == null) { > requestNum = 0 > } > > val ecpm = key._7.toDouble.formatted("%.2f").toFloat > val created_at = getSecondTimestampTwo(new Date) > > * elements.foreach(e => { > if ("adreq".equals(e._3)) { > requestNum += 1 > println(key._1, requestNum) > } > }) > requestCount.update(requestNum) > println(requestNum, key._1)* > > out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * > 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, > key._6, key._1, requestCount.value, returnCount.value, fill_rate, >
[jira] [Commented] (FLINK-18741) ProcessWindowFunction's process function exception
[ https://issues.apache.org/jira/browse/FLINK-18741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171876#comment-17171876 ] mzz commented on FLINK-18741: - When I use ProcessWindowFunction , override process function, I traverse the iterator in this function, the values outside are reset to their initial values。 the iterator is elements,define a variable in the process function and assign the initial value. Then, the iterator is traversed to give the variable + 1. However, when the window ends, the value of the variable will be reset to the initial value. *code:* {code:java} elements.foreach(e => { if ("adreq".equals(e._3)) { requestNum += 1 println(key._1, requestNum) // The values printed here like : //(key,1) //(key,2) //(key,3) } }) //But print outside the for loop always like : //(key,0) println(requestNum, key._1) {code} > ProcessWindowFunction's process function exception > --- > > Key: FLINK-18741 > URL: https://issues.apache.org/jira/browse/FLINK-18741 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > I use ProcessWindowFunction to achieve PV calculation, but when rewriting > process, the user-defined state value cannot be returned。 > code: > {code:java} > tem.keyBy(x => > (x._1, x._2, x._4, x._5, x._6, x._7, x._8)) > .timeWindow(Time.seconds(15 * 60)) //15 min window > .process(new ProcessWindowFunction[(String, String, String, String, > String, String, String, String, String), CkResult, (String, String, String, > String, String, String, String), TimeWindow] { > var clickCount: ValueState[Long] = _ > * var requestCount: ValueState[Long] = _ > * var returnCount: ValueState[Long] = _ > var videoCount: ValueState[Long] = _ > var noVideoCount: ValueState[Long] = _ > override def open(parameters: Configuration): Unit = { > clickCount = getRuntimeContext.getState(new > ValueStateDescriptor("clickCount", classOf[Long])) >* requestCount = getRuntimeContext.getState(new > ValueStateDescriptor("requestCount", classOf[Long]))* > returnCount = getRuntimeContext.getState(new > ValueStateDescriptor("returnCount", classOf[Long])) > videoCount = getRuntimeContext.getState(new > ValueStateDescriptor("videoCount", classOf[Long])) > noVideoCount = getRuntimeContext.getState(new > ValueStateDescriptor("noVideoCount", classOf[Long])) > } > override def process(key: (String, String, String, String, String, > String, String), context: Context, elements: Iterable[(String, String, > String, String, String, String, String, String, String)], out: > Collector[CkResult]) = { > try { > var clickNum: Long = clickCount.value > val dateNow = > LocalDateTime.now().format(DateTimeFormatter.ofPattern("MMdd")).toLong > var requestNum: Long = requestCount.value > var returnNum: Long = returnCount.value > var videoNum: Long = videoCount.value > var noVideoNum: Long = noVideoCount.value > if (requestNum == null) { > requestNum = 0 > } > > val ecpm = key._7.toDouble.formatted("%.2f").toFloat > val created_at = getSecondTimestampTwo(new Date) > > * elements.foreach(e => { > if ("adreq".equals(e._3)) { > requestNum += 1 > println(key._1, requestNum) > } > }) > requestCount.update(requestNum) > println(requestNum, key._1)* > > out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * > 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, > key._6, key._1, requestCount.value, returnCount.value, fill_rate, > noVideoCount.value + videoCount.value, > expose_rate, clickCount.value, click_rate, ecpm, > (noVideoCount.value * ecpm + videoCount.value * ecpm / > 1000.toFloat).formatted("%.2f").toFloat, created_at)) > } > catch { > case e: Exception => println(key, e) > } > } > }) > {code} > {code:java} > elements.foreach(e => { > if ("adreq".equals(e._3)) { > requestNum += 1 > println(key._1, requestNum) > // The values printed here like : > //(key,1) > //(key,2) > //(key,3) > } > }) > //But print outside the for loop always like : > //(key,0) > println(requestNum, key._1) > {code} > who can help me ,plz thx。 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18741) ProcessWindowFunction's process function exception
mzz created FLINK-18741: --- Summary: ProcessWindowFunction's process function exception Key: FLINK-18741 URL: https://issues.apache.org/jira/browse/FLINK-18741 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.10.0 Reporter: mzz I use ProcessWindowFunction to achieve PV calculation, but when rewriting process, the user-defined state value cannot be returned。 code: {code:java} tem.keyBy(x => (x._1, x._2, x._4, x._5, x._6, x._7, x._8)) .timeWindow(Time.seconds(15 * 60)) //15 min window .process(new ProcessWindowFunction[(String, String, String, String, String, String, String, String, String), CkResult, (String, String, String, String, String, String, String), TimeWindow] { var clickCount: ValueState[Long] = _ * var requestCount: ValueState[Long] = _ * var returnCount: ValueState[Long] = _ var videoCount: ValueState[Long] = _ var noVideoCount: ValueState[Long] = _ override def open(parameters: Configuration): Unit = { clickCount = getRuntimeContext.getState(new ValueStateDescriptor("clickCount", classOf[Long])) * requestCount = getRuntimeContext.getState(new ValueStateDescriptor("requestCount", classOf[Long]))* returnCount = getRuntimeContext.getState(new ValueStateDescriptor("returnCount", classOf[Long])) videoCount = getRuntimeContext.getState(new ValueStateDescriptor("videoCount", classOf[Long])) noVideoCount = getRuntimeContext.getState(new ValueStateDescriptor("noVideoCount", classOf[Long])) } override def process(key: (String, String, String, String, String, String, String), context: Context, elements: Iterable[(String, String, String, String, String, String, String, String, String)], out: Collector[CkResult]) = { try { var clickNum: Long = clickCount.value val dateNow = LocalDateTime.now().format(DateTimeFormatter.ofPattern("MMdd")).toLong var requestNum: Long = requestCount.value var returnNum: Long = returnCount.value var videoNum: Long = videoCount.value var noVideoNum: Long = noVideoCount.value if (requestNum == null) { requestNum = 0 } val ecpm = key._7.toDouble.formatted("%.2f").toFloat val created_at = getSecondTimestampTwo(new Date) * elements.foreach(e => { if ("adreq".equals(e._3)) { requestNum += 1 println(key._1, requestNum) } }) requestCount.update(requestNum) println(requestNum, key._1)* out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, key._6, key._1, requestCount.value, returnCount.value, fill_rate, noVideoCount.value + videoCount.value, expose_rate, clickCount.value, click_rate, ecpm, (noVideoCount.value * ecpm + videoCount.value * ecpm / 1000.toFloat).formatted("%.2f").toFloat, created_at)) } catch { case e: Exception => println(key, e) } } }) {code} {code:java} elements.foreach(e => { if ("adreq".equals(e._3)) { requestNum += 1 println(key._1, requestNum) // The values printed here like : //(key,1) //(key,2) //(key,3) } }) //But print outside the for loop always like : //(key,0) println(requestNum, key._1) {code} who can help me ,plz thx。 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)
[ https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161870#comment-17161870 ] mzz commented on FLINK-18652: - [~jark] In the UI, there is no data in sink, but sometimes data can be inserted... Moreover, the checkpoint of sink has always been failed. Running programs locally can always insert data. > JDBCAppendTableSink to ClickHouse (data always repeating) > -- > > Key: FLINK-18652 > URL: https://issues.apache.org/jira/browse/FLINK-18652 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > Attachments: FLINK-UI.png, checkpoint-failed.png > > > Hi all, >data stream is : kafka->flinkSQL->clickhouse。 >The window is 15 min,but,15 minutes after the first time, the data > kepping repeat sink to ClickHouse, plz help me ,thx。 > {code:java} > *// data source from kafka > * streamTableEnvironment.sqlUpdate(createTableSql) > LOG.info("kafka source table has created !") > val groupTable = streamTableEnvironment.sqlQuery(tempSql) > streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable) > *// this is window sql ,use ProcessingTime > *val re_table = streamTableEnvironment.sqlQuery(windowSql) > re_table.printSchema() > //groupTable.printSchema() > val rr = streamTableEnvironment.toAppendStream[Result](re_table) > * // The data here is printed normally > *rr.print() > streamTableEnvironment.createTemporaryView("result_table", rr) > val s = streamTableEnvironment.sqlQuery(sql) > *// sink to clickhouse* > val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder() > .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") > .setDBUrl(URL) > .setQuery(insertCKSql) > .setUsername(USERNAME) > .setPassword(PASSWORD) > .setBatchSize(1) > .setParameterTypes( > Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, > Types.FLOAT, > Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, > Types.FLOAT, Types.LONG() > ) > .build() > streamTableEnvironment.registerTableSink("ckResult", > Array[String]("data_date", "point", "platform", "page_name", > "component_name", "booth_name", "position1", "advertiser", > "adv_code", "request_num", "return_num", "fill_rate", "expose_num", > "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"), > Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, > Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()), > sink) > // insert into TableSink > s.insertInto("ckResult") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)
[ https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161867#comment-17161867 ] mzz commented on FLINK-18652: - [~jark] Yes, in the UI, there is no data in the sink display, but the number of data in the Clickhouse is increasing all the time, which is very strange. After I submit the task again, the data will not be inserted into the Clickhouse after a window, but the same code can be inserted into the Clickhouse in the local idea. > JDBCAppendTableSink to ClickHouse (data always repeating) > -- > > Key: FLINK-18652 > URL: https://issues.apache.org/jira/browse/FLINK-18652 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > Attachments: FLINK-UI.png, checkpoint-failed.png > > > Hi all, >data stream is : kafka->flinkSQL->clickhouse。 >The window is 15 min,but,15 minutes after the first time, the data > kepping repeat sink to ClickHouse, plz help me ,thx。 > {code:java} > *// data source from kafka > * streamTableEnvironment.sqlUpdate(createTableSql) > LOG.info("kafka source table has created !") > val groupTable = streamTableEnvironment.sqlQuery(tempSql) > streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable) > *// this is window sql ,use ProcessingTime > *val re_table = streamTableEnvironment.sqlQuery(windowSql) > re_table.printSchema() > //groupTable.printSchema() > val rr = streamTableEnvironment.toAppendStream[Result](re_table) > * // The data here is printed normally > *rr.print() > streamTableEnvironment.createTemporaryView("result_table", rr) > val s = streamTableEnvironment.sqlQuery(sql) > *// sink to clickhouse* > val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder() > .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") > .setDBUrl(URL) > .setQuery(insertCKSql) > .setUsername(USERNAME) > .setPassword(PASSWORD) > .setBatchSize(1) > .setParameterTypes( > Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, > Types.FLOAT, > Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, > Types.FLOAT, Types.LONG() > ) > .build() > streamTableEnvironment.registerTableSink("ckResult", > Array[String]("data_date", "point", "platform", "page_name", > "component_name", "booth_name", "position1", "advertiser", > "adv_code", "request_num", "return_num", "fill_rate", "expose_num", > "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"), > Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, > Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()), > sink) > // insert into TableSink > s.insertInto("ckResult") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)
[ https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161751#comment-17161751 ] mzz edited comment on FLINK-18652 at 7/21/20, 6:21 AM: --- [~jark] When I look at the UI, I find that the subtask of sink has no data, but other operators have data。and chekpoint always failed on sink operators。 !FLINK-UI.png! was (Author: mzz_q): When I look at the UI, I find that the subtask of sink has no data, but other operators have data。and chekpoint always failed on sink operators。 !FLINK-UI.png! > JDBCAppendTableSink to ClickHouse (data always repeating) > -- > > Key: FLINK-18652 > URL: https://issues.apache.org/jira/browse/FLINK-18652 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > Attachments: FLINK-UI.png, checkpoint-failed.png > > > Hi all, >data stream is : kafka->flinkSQL->clickhouse。 >The window is 15 min,but,15 minutes after the first time, the data > kepping repeat sink to ClickHouse, plz help me ,thx。 > {code:java} > *// data source from kafka > * streamTableEnvironment.sqlUpdate(createTableSql) > LOG.info("kafka source table has created !") > val groupTable = streamTableEnvironment.sqlQuery(tempSql) > streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable) > *// this is window sql ,use ProcessingTime > *val re_table = streamTableEnvironment.sqlQuery(windowSql) > re_table.printSchema() > //groupTable.printSchema() > val rr = streamTableEnvironment.toAppendStream[Result](re_table) > * // The data here is printed normally > *rr.print() > streamTableEnvironment.createTemporaryView("result_table", rr) > val s = streamTableEnvironment.sqlQuery(sql) > *// sink to clickhouse* > val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder() > .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") > .setDBUrl(URL) > .setQuery(insertCKSql) > .setUsername(USERNAME) > .setPassword(PASSWORD) > .setBatchSize(1) > .setParameterTypes( > Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, > Types.FLOAT, > Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, > Types.FLOAT, Types.LONG() > ) > .build() > streamTableEnvironment.registerTableSink("ckResult", > Array[String]("data_date", "point", "platform", "page_name", > "component_name", "booth_name", "position1", "advertiser", > "adv_code", "request_num", "return_num", "fill_rate", "expose_num", > "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"), > Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, > Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()), > sink) > // insert into TableSink > s.insertInto("ckResult") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)
[ https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161751#comment-17161751 ] mzz commented on FLINK-18652: - When I look at the UI, I find that the subtask of sink has no data, but other operators have data。and chekpoint always failed on sink operators。 !FLINK-UI.png! > JDBCAppendTableSink to ClickHouse (data always repeating) > -- > > Key: FLINK-18652 > URL: https://issues.apache.org/jira/browse/FLINK-18652 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > Attachments: FLINK-UI.png, checkpoint-failed.png > > > Hi all, >data stream is : kafka->flinkSQL->clickhouse。 >The window is 15 min,but,15 minutes after the first time, the data > kepping repeat sink to ClickHouse, plz help me ,thx。 > {code:java} > *// data source from kafka > * streamTableEnvironment.sqlUpdate(createTableSql) > LOG.info("kafka source table has created !") > val groupTable = streamTableEnvironment.sqlQuery(tempSql) > streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable) > *// this is window sql ,use ProcessingTime > *val re_table = streamTableEnvironment.sqlQuery(windowSql) > re_table.printSchema() > //groupTable.printSchema() > val rr = streamTableEnvironment.toAppendStream[Result](re_table) > * // The data here is printed normally > *rr.print() > streamTableEnvironment.createTemporaryView("result_table", rr) > val s = streamTableEnvironment.sqlQuery(sql) > *// sink to clickhouse* > val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder() > .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") > .setDBUrl(URL) > .setQuery(insertCKSql) > .setUsername(USERNAME) > .setPassword(PASSWORD) > .setBatchSize(1) > .setParameterTypes( > Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, > Types.FLOAT, > Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, > Types.FLOAT, Types.LONG() > ) > .build() > streamTableEnvironment.registerTableSink("ckResult", > Array[String]("data_date", "point", "platform", "page_name", > "component_name", "booth_name", "position1", "advertiser", > "adv_code", "request_num", "return_num", "fill_rate", "expose_num", > "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"), > Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, > Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()), > sink) > // insert into TableSink > s.insertInto("ckResult") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)
[ https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz updated FLINK-18652: Attachment: FLINK-UI.png > JDBCAppendTableSink to ClickHouse (data always repeating) > -- > > Key: FLINK-18652 > URL: https://issues.apache.org/jira/browse/FLINK-18652 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > Attachments: FLINK-UI.png, checkpoint-failed.png > > > Hi all, >data stream is : kafka->flinkSQL->clickhouse。 >The window is 15 min,but,15 minutes after the first time, the data > kepping repeat sink to ClickHouse, plz help me ,thx。 > {code:java} > *// data source from kafka > * streamTableEnvironment.sqlUpdate(createTableSql) > LOG.info("kafka source table has created !") > val groupTable = streamTableEnvironment.sqlQuery(tempSql) > streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable) > *// this is window sql ,use ProcessingTime > *val re_table = streamTableEnvironment.sqlQuery(windowSql) > re_table.printSchema() > //groupTable.printSchema() > val rr = streamTableEnvironment.toAppendStream[Result](re_table) > * // The data here is printed normally > *rr.print() > streamTableEnvironment.createTemporaryView("result_table", rr) > val s = streamTableEnvironment.sqlQuery(sql) > *// sink to clickhouse* > val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder() > .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") > .setDBUrl(URL) > .setQuery(insertCKSql) > .setUsername(USERNAME) > .setPassword(PASSWORD) > .setBatchSize(1) > .setParameterTypes( > Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, > Types.FLOAT, > Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, > Types.FLOAT, Types.LONG() > ) > .build() > streamTableEnvironment.registerTableSink("ckResult", > Array[String]("data_date", "point", "platform", "page_name", > "component_name", "booth_name", "position1", "advertiser", > "adv_code", "request_num", "return_num", "fill_rate", "expose_num", > "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"), > Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, > Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()), > sink) > // insert into TableSink > s.insertInto("ckResult") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)
[ https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz updated FLINK-18652: Attachment: checkpoint-failed.png > JDBCAppendTableSink to ClickHouse (data always repeating) > -- > > Key: FLINK-18652 > URL: https://issues.apache.org/jira/browse/FLINK-18652 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > Attachments: checkpoint-failed.png > > > Hi all, >data stream is : kafka->flinkSQL->clickhouse。 >The window is 15 min,but,15 minutes after the first time, the data > kepping repeat sink to ClickHouse, plz help me ,thx。 > {code:java} > *// data source from kafka > * streamTableEnvironment.sqlUpdate(createTableSql) > LOG.info("kafka source table has created !") > val groupTable = streamTableEnvironment.sqlQuery(tempSql) > streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable) > *// this is window sql ,use ProcessingTime > *val re_table = streamTableEnvironment.sqlQuery(windowSql) > re_table.printSchema() > //groupTable.printSchema() > val rr = streamTableEnvironment.toAppendStream[Result](re_table) > * // The data here is printed normally > *rr.print() > streamTableEnvironment.createTemporaryView("result_table", rr) > val s = streamTableEnvironment.sqlQuery(sql) > *// sink to clickhouse* > val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder() > .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") > .setDBUrl(URL) > .setQuery(insertCKSql) > .setUsername(USERNAME) > .setPassword(PASSWORD) > .setBatchSize(1) > .setParameterTypes( > Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, > Types.FLOAT, > Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, > Types.FLOAT, Types.LONG() > ) > .build() > streamTableEnvironment.registerTableSink("ckResult", > Array[String]("data_date", "point", "platform", "page_name", > "component_name", "booth_name", "position1", "advertiser", > "adv_code", "request_num", "return_num", "fill_rate", "expose_num", > "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"), > Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, > Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()), > sink) > // insert into TableSink > s.insertInto("ckResult") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)
[ https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161741#comment-17161741 ] mzz commented on FLINK-18652: - [~jark] *tempSql:* {code:java} val tempSql = """ |select |ad_id, |projectname, |data_date, |event_type, |ts, |event_id, |event_page, |event_component, |event_booth, |ad_type, |ecpm |FROM aggs_test |where projectname is not null and projectname <> '' and projectname<> ' ' and ad_id is not null and ad_id <> '' and ad_id<> ' ' """.stripMargin {code} > JDBCAppendTableSink to ClickHouse (data always repeating) > -- > > Key: FLINK-18652 > URL: https://issues.apache.org/jira/browse/FLINK-18652 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > Hi all, >data stream is : kafka->flinkSQL->clickhouse。 >The window is 15 min,but,15 minutes after the first time, the data > kepping repeat sink to ClickHouse, plz help me ,thx。 > {code:java} > *// data source from kafka > * streamTableEnvironment.sqlUpdate(createTableSql) > LOG.info("kafka source table has created !") > val groupTable = streamTableEnvironment.sqlQuery(tempSql) > streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable) > *// this is window sql ,use ProcessingTime > *val re_table = streamTableEnvironment.sqlQuery(windowSql) > re_table.printSchema() > //groupTable.printSchema() > val rr = streamTableEnvironment.toAppendStream[Result](re_table) > * // The data here is printed normally > *rr.print() > streamTableEnvironment.createTemporaryView("result_table", rr) > val s = streamTableEnvironment.sqlQuery(sql) > *// sink to clickhouse* > val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder() > .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") > .setDBUrl(URL) > .setQuery(insertCKSql) > .setUsername(USERNAME) > .setPassword(PASSWORD) > .setBatchSize(1) > .setParameterTypes( > Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, > Types.FLOAT, > Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, > Types.FLOAT, Types.LONG() > ) > .build() > streamTableEnvironment.registerTableSink("ckResult", > Array[String]("data_date", "point", "platform", "page_name", > "component_name", "booth_name", "position1", "advertiser", > "adv_code", "request_num", "return_num", "fill_rate", "expose_num", > "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"), > Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, > Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, > Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, > Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()), > sink) > // insert into TableSink > s.insertInto("ckResult") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-18443) The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters length l
[ https://issues.apache.org/jira/browse/FLINK-18443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz closed FLINK-18443. --- Fix Version/s: 1.10.0 Resolution: Fixed > The operator name select: (ip, ts, count, environment.access AS access, > environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the > 80 characters length limit and was truncated > > > Key: FLINK-18443 > URL: https://issues.apache.org/jira/browse/FLINK-18443 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.0 >Reporter: mzz >Priority: Major > Fix For: 1.10.0 > > > *Schema:* > {code:java} > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("procTime", Types.SQL_TIMESTAMP).proctime() > .field("environment", schemaEnvironment) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, > Types.ROW(Array("count","sid", "eventid","params"), > Array[TypeInformation[_]](Types.STRING(),Types.STRING(), > > Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]] > (Types.STRING(),Types.STRING(),Types.STRING())) > ) > {code} > *when execute this sql*: > {code:java} > val sql = > """ > |SELECT > |ip, > |ts, > |params.ad, > |params.adtype, > |eventid, > |procTime > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params) > |""".stripMargin > {code} > *I got a warning,and the console keeps brushing this warning,no normal > printout* > {code:java} > 09:38:38,694 WARN org.apache.flink.metrics.MetricGroup >- The operator name correlate: table(explode($cor0.advs)), select: ip, ts, > procTime, advs, sid, eventid, params exceeded the 80 characters length limit > and was truncated. > {code} > *But after I change it to this way, although I occasionally brush this Warn, > it can be output normally。I change the 'params' type from Types.ROW to > Types.STRING*。 > {code:java} > .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, > Types.ROW(Array("count", "sid", "eventid", "params"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-18575) Failed to send data to Kafka
[ https://issues.apache.org/jira/browse/FLINK-18575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz closed FLINK-18575. --- Release Note: kafka has some error,its not about flink Resolution: Not A Problem > Failed to send data to Kafka > > > Key: FLINK-18575 > URL: https://issues.apache.org/jira/browse/FLINK-18575 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Major > > Flink version: 1.10.0 > Kafka version: 2.2 > *code:* > {code:java} > private def producerKafka(aggs_result: DataStream[String], topic: String, > parallelism: Int) = { > val kafkaPro = new Properties() > kafkaPro.setProperty("bootstrap.servers", SINK_BROKERS) > kafkaPro.setProperty("zookeeper.connect", SINK_ZK) > kafkaPro.setProperty("request.timeout.ms", "1") > kafkaPro.setProperty("compression.type", "snappy") > kafkaPro.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "") > // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试: > kafkaPro.setProperty(ProducerConfig.RETRIES_CONFIG, "5") > val kafka = new FlinkKafkaProducer[String](topic, new > ResultDtSerialization(topic), kafkaPro, > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE) > aggs_result.addSink(kafka).setParallelism(parallelism) > } > {code} > *when i use this code to produce to kafka ,its report a Error : > *{code:java} > 2020-07-13 10:25:47,624 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during > disposal of stream operator. > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to > send data to Kafka: Pending record count must be zero at this point: 1 > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalStateException: Pending record count must be zero > at this point: 1 > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834) > ... 8 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)
mzz created FLINK-18652: --- Summary: JDBCAppendTableSink to ClickHouse (data always repeating) Key: FLINK-18652 URL: https://issues.apache.org/jira/browse/FLINK-18652 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.10.0 Reporter: mzz Hi all, data stream is : kafka->flinkSQL->clickhouse。 The window is 15 min,but,15 minutes after the first time, the data kepping repeat sink to ClickHouse, plz help me ,thx。 {code:java} *// data source from kafka * streamTableEnvironment.sqlUpdate(createTableSql) LOG.info("kafka source table has created !") val groupTable = streamTableEnvironment.sqlQuery(tempSql) streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable) *// this is window sql ,use ProcessingTime *val re_table = streamTableEnvironment.sqlQuery(windowSql) re_table.printSchema() //groupTable.printSchema() val rr = streamTableEnvironment.toAppendStream[Result](re_table) * // The data here is printed normally *rr.print() streamTableEnvironment.createTemporaryView("result_table", rr) val s = streamTableEnvironment.sqlQuery(sql) *// sink to clickhouse* val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl(URL) .setQuery(insertCKSql) .setUsername(USERNAME) .setPassword(PASSWORD) .setBatchSize(1) .setParameterTypes( Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG() ) .build() streamTableEnvironment.registerTableSink("ckResult", Array[String]("data_date", "point", "platform", "page_name", "component_name", "booth_name", "position1", "advertiser", "adv_code", "request_num", "return_num", "fill_rate", "expose_num", "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"), Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()), sink) // insert into TableSink s.insertInto("ckResult") {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18575) Failed to send data to Kafka
[ https://issues.apache.org/jira/browse/FLINK-18575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161658#comment-17161658 ] mzz commented on FLINK-18575: - the problem has resolved,kafka has some error。It's not about Flink > Failed to send data to Kafka > > > Key: FLINK-18575 > URL: https://issues.apache.org/jira/browse/FLINK-18575 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Major > > Flink version: 1.10.0 > Kafka version: 2.2 > *code:* > {code:java} > private def producerKafka(aggs_result: DataStream[String], topic: String, > parallelism: Int) = { > val kafkaPro = new Properties() > kafkaPro.setProperty("bootstrap.servers", SINK_BROKERS) > kafkaPro.setProperty("zookeeper.connect", SINK_ZK) > kafkaPro.setProperty("request.timeout.ms", "1") > kafkaPro.setProperty("compression.type", "snappy") > kafkaPro.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "") > // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试: > kafkaPro.setProperty(ProducerConfig.RETRIES_CONFIG, "5") > val kafka = new FlinkKafkaProducer[String](topic, new > ResultDtSerialization(topic), kafkaPro, > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE) > aggs_result.addSink(kafka).setParallelism(parallelism) > } > {code} > *when i use this code to produce to kafka ,its report a Error : > *{code:java} > 2020-07-13 10:25:47,624 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during > disposal of stream operator. > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to > send data to Kafka: Pending record count must be zero at this point: 1 > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalStateException: Pending record count must be zero > at this point: 1 > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834) > ... 8 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18575) Failed to send data to Kafka
[ https://issues.apache.org/jira/browse/FLINK-18575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156456#comment-17156456 ] mzz commented on FLINK-18575: - supply: the application was running,and message was normally produce ,but log message always report this error. > Failed to send data to Kafka > > > Key: FLINK-18575 > URL: https://issues.apache.org/jira/browse/FLINK-18575 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Major > > Flink version: 1.10.0 > Kafka version: 2.2 > *code:* > {code:java} > private def producerKafka(aggs_result: DataStream[String], topic: String, > parallelism: Int) = { > val kafkaPro = new Properties() > kafkaPro.setProperty("bootstrap.servers", SINK_BROKERS) > kafkaPro.setProperty("zookeeper.connect", SINK_ZK) > kafkaPro.setProperty("request.timeout.ms", "1") > kafkaPro.setProperty("compression.type", "snappy") > kafkaPro.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "") > // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试: > kafkaPro.setProperty(ProducerConfig.RETRIES_CONFIG, "5") > val kafka = new FlinkKafkaProducer[String](topic, new > ResultDtSerialization(topic), kafkaPro, > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE) > aggs_result.addSink(kafka).setParallelism(parallelism) > } > {code} > *when i use this code to produce to kafka ,its report a Error : > *{code:java} > 2020-07-13 10:25:47,624 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error during > disposal of stream operator. > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to > send data to Kafka: Pending record count must be zero at this point: 1 > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalStateException: Pending record count must be zero > at this point: 1 > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834) > ... 8 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18575) Failed to send data to Kafka
mzz created FLINK-18575: --- Summary: Failed to send data to Kafka Key: FLINK-18575 URL: https://issues.apache.org/jira/browse/FLINK-18575 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.10.0 Reporter: mzz Flink version: 1.10.0 Kafka version: 2.2 *code:* {code:java} private def producerKafka(aggs_result: DataStream[String], topic: String, parallelism: Int) = { val kafkaPro = new Properties() kafkaPro.setProperty("bootstrap.servers", SINK_BROKERS) kafkaPro.setProperty("zookeeper.connect", SINK_ZK) kafkaPro.setProperty("request.timeout.ms", "1") kafkaPro.setProperty("compression.type", "snappy") kafkaPro.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "") // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试: kafkaPro.setProperty(ProducerConfig.RETRIES_CONFIG, "5") val kafka = new FlinkKafkaProducer[String](topic, new ResultDtSerialization(topic), kafkaPro, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE) aggs_result.addSink(kafka).setParallelism(parallelism) } {code} *when i use this code to produce to kafka ,its report a Error : *{code:java} 2020-07-13 10:25:47,624 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator. org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Pending record count must be zero at this point: 1 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Pending record count must be zero at this point: 1 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834) ... 8 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153290#comment-17153290 ] mzz edited comment on FLINK-18511 at 7/8/20, 10:16 AM: --- [~jark] its ok. thx {code:java} ts AS TO_TIMESTAMP(FROM_UNIXTIME(server_timestamp, '-MM-dd HH:mm:ss')) {code} was (Author: mzz_q): [~jark] Sry,can you show me this function how use,this Excerpt to flink website : {code:java} Converts date time string string1 with format string2 (by default: '-MM-dd HH:mm:ss') under the session time zone (specified by TableConfig) to a timestamp. {code} and my server_timestamp like as '1593737902', THX > Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > -- > > Key: FLINK-18511 > URL: https://issues.apache.org/jira/browse/FLINK-18511 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > I use Schema like this : > {code:java} > val schema = new Schema() > .field("rowtimes", DataTypes.TIME(3)).rowtime(new > Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000)) > {code} > sql: > {code:java} > val groupSql = > """ > |select access, > |count(access), > |TUMBLE_END(rowtimes,INTERVAL '10' second) > |from log_test_table > |group by > |access,TUMBLE_END(rowtimes,INTERVAL '10' second) > """.stripMargin > {code} > report errors: > {code:java} > Exception in thread "main" > org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: > TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) > at >
[jira] [Issue Comment Deleted] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz updated FLINK-18511: Comment: was deleted (was: [~jark] my code is this : {code:java} app_time BIGINT, -- 13位的时间戳(1587975971) ts AS TO_TIMESTAMP(FROM_UNIXTIME(app_time , '-MM-dd HH:mm:ss')), -- 定义事件时间 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 在ts上定义5 秒延迟的 watermark {code} it always reported error : {code:java} org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "count ," at line 1, column 562. {code} ) > Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > -- > > Key: FLINK-18511 > URL: https://issues.apache.org/jira/browse/FLINK-18511 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > I use Schema like this : > {code:java} > val schema = new Schema() > .field("rowtimes", DataTypes.TIME(3)).rowtime(new > Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000)) > {code} > sql: > {code:java} > val groupSql = > """ > |select access, > |count(access), > |TUMBLE_END(rowtimes,INTERVAL '10' second) > |from log_test_table > |group by > |access,TUMBLE_END(rowtimes,INTERVAL '10' second) > """.stripMargin > {code} > report errors: > {code:java} > Exception in thread "main" > org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: > TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) >
[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153458#comment-17153458 ] mzz commented on FLINK-18511: - [~jark] my code is this : {code:java} app_time BIGINT, -- 13位的时间戳(1587975971) ts AS TO_TIMESTAMP(FROM_UNIXTIME(app_time , '-MM-dd HH:mm:ss')), -- 定义事件时间 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 在ts上定义5 秒延迟的 watermark {code} it always reported error : {code:java} org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "count ," at line 1, column 562. {code} > Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > -- > > Key: FLINK-18511 > URL: https://issues.apache.org/jira/browse/FLINK-18511 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > I use Schema like this : > {code:java} > val schema = new Schema() > .field("rowtimes", DataTypes.TIME(3)).rowtime(new > Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000)) > {code} > sql: > {code:java} > val groupSql = > """ > |select access, > |count(access), > |TUMBLE_END(rowtimes,INTERVAL '10' second) > |from log_test_table > |group by > |access,TUMBLE_END(rowtimes,INTERVAL '10' second) > """.stripMargin > {code} > report errors: > {code:java} > Exception in thread "main" > org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: > TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) > at >
[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153290#comment-17153290 ] mzz commented on FLINK-18511: - [~jark] Sry,can you show me this function how use,this Excerpt to flink website : {code:java} Converts date time string string1 with format string2 (by default: '-MM-dd HH:mm:ss') under the session time zone (specified by TableConfig) to a timestamp. {code} and my server_timestamp like as '1593737902', THX > Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > -- > > Key: FLINK-18511 > URL: https://issues.apache.org/jira/browse/FLINK-18511 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > I use Schema like this : > {code:java} > val schema = new Schema() > .field("rowtimes", DataTypes.TIME(3)).rowtime(new > Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000)) > {code} > sql: > {code:java} > val groupSql = > """ > |select access, > |count(access), > |TUMBLE_END(rowtimes,INTERVAL '10' second) > |from log_test_table > |group by > |access,TUMBLE_END(rowtimes,INTERVAL '10' second) > """.stripMargin > {code} > report errors: > {code:java} > Exception in thread "main" > org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: > TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) > at >
[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153256#comment-17153256 ] mzz commented on FLINK-18511: - [~jark] I finf this error ,my `server_timestamp` like as 1593737902,cant use TIMESTAMP(3) directlly。So I coding like this ,server_timestamp AS PROCTIME()。But it caused a error: {code:java} org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "count ," at line 1, column 562. {code} when I use server_timestamp VARCHAR, I cant use Watermark。 How can I change type VARCHAR to TIMESTAMP(3),THX。 > Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > -- > > Key: FLINK-18511 > URL: https://issues.apache.org/jira/browse/FLINK-18511 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > I use Schema like this : > {code:java} > val schema = new Schema() > .field("rowtimes", DataTypes.TIME(3)).rowtime(new > Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000)) > {code} > sql: > {code:java} > val groupSql = > """ > |select access, > |count(access), > |TUMBLE_END(rowtimes,INTERVAL '10' second) > |from log_test_table > |group by > |access,TUMBLE_END(rowtimes,INTERVAL '10' second) > """.stripMargin > {code} > report errors: > {code:java} > Exception in thread "main" > org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: > TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) > at >
[jira] [Comment Edited] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153256#comment-17153256 ] mzz edited comment on FLINK-18511 at 7/8/20, 6:13 AM: -- [~jark] I find this error ,my `server_timestamp` like as 1593737902,cant use TIMESTAMP(3) directlly。So I coding like this ,server_timestamp AS PROCTIME()。But it caused a error: {code:java} org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "count ," at line 1, column 562. {code} when I use server_timestamp VARCHAR, I cant use Watermark。 How can I change type VARCHAR to TIMESTAMP(3),THX。 was (Author: mzz_q): [~jark] I finf this error ,my `server_timestamp` like as 1593737902,cant use TIMESTAMP(3) directlly。So I coding like this ,server_timestamp AS PROCTIME()。But it caused a error: {code:java} org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "count ," at line 1, column 562. {code} when I use server_timestamp VARCHAR, I cant use Watermark。 How can I change type VARCHAR to TIMESTAMP(3),THX。 > Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > -- > > Key: FLINK-18511 > URL: https://issues.apache.org/jira/browse/FLINK-18511 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > I use Schema like this : > {code:java} > val schema = new Schema() > .field("rowtimes", DataTypes.TIME(3)).rowtime(new > Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000)) > {code} > sql: > {code:java} > val groupSql = > """ > |select access, > |count(access), > |TUMBLE_END(rowtimes,INTERVAL '10' second) > |from log_test_table > |group by > |access,TUMBLE_END(rowtimes,INTERVAL '10' second) > """.stripMargin > {code} > report errors: > {code:java} > Exception in thread "main" > org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: > TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at >
[jira] [Commented] (FLINK-18443) The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters lengt
[ https://issues.apache.org/jira/browse/FLINK-18443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153207#comment-17153207 ] mzz commented on FLINK-18443: - I replaced Types with DataTypes, the problem has solved > The operator name select: (ip, ts, count, environment.access AS access, > environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the > 80 characters length limit and was truncated > > > Key: FLINK-18443 > URL: https://issues.apache.org/jira/browse/FLINK-18443 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.0 >Reporter: mzz >Priority: Major > > *Schema:* > {code:java} > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("procTime", Types.SQL_TIMESTAMP).proctime() > .field("environment", schemaEnvironment) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, > Types.ROW(Array("count","sid", "eventid","params"), > Array[TypeInformation[_]](Types.STRING(),Types.STRING(), > > Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]] > (Types.STRING(),Types.STRING(),Types.STRING())) > ) > {code} > *when execute this sql*: > {code:java} > val sql = > """ > |SELECT > |ip, > |ts, > |params.ad, > |params.adtype, > |eventid, > |procTime > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params) > |""".stripMargin > {code} > *I got a warning,and the console keeps brushing this warning,no normal > printout* > {code:java} > 09:38:38,694 WARN org.apache.flink.metrics.MetricGroup >- The operator name correlate: table(explode($cor0.advs)), select: ip, ts, > procTime, advs, sid, eventid, params exceeded the 80 characters length limit > and was truncated. > {code} > *But after I change it to this way, although I occasionally brush this Warn, > it can be output normally。I change the 'params' type from Types.ROW to > Types.STRING*。 > {code:java} > .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, > Types.ROW(Array("count", "sid", "eventid", "params"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153205#comment-17153205 ] mzz commented on FLINK-18511: - THX [~jark]. I use DDL to register the table, createSQL: {code:java} val createTableSql = """ |CREATE TABLE aggs_test( | data_date VARCHAR, | data_min VARCHAR, | data_hour VARCHAR, | server_timestamp TIMESTAMP(3), | access VARCHAR | WATERMARK FOR server_timestamp AS server_timestamp - INTERVAL '5' SECOND |) |WITH( | 'connector.type' ='kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'xxx', | 'connector.startup-mode' = 'earliest-offset', | 'connector.properties.0.key' = 'bootstrap.servers', | 'connector.properties.0.value' = 'xxx', | 'update-mode' = 'append', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |) """.stripMargin {code} groupBySql: {code:java} val groupSql = """ |select access, |TUMBLE_START(server_timestamp,INTERVAL '10' second), |count(ad_id) |from aggs_test |group by |TUMBLE(server_timestamp ,INTERVAL '10' second),access """.stripMargin {code} error: {code:java} WARN org.apache.flink.streaming.runtime.tasks.StreamTask - Error while canceling task. org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException at org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818) at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136) at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602) at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355) at java.lang.Thread.run(Thread.java:748) {code} > Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > -- > > Key: FLINK-18511 > URL: https://issues.apache.org/jira/browse/FLINK-18511 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > I use Schema like this : > {code:java} > val schema = new Schema() > .field("rowtimes", DataTypes.TIME(3)).rowtime(new > Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000)) > {code} > sql: > {code:java} > val groupSql = > """ > |select access, > |count(access), > |TUMBLE_END(rowtimes,INTERVAL '10' second) > |from log_test_table > |group by > |access,TUMBLE_END(rowtimes,INTERVAL '10' second) > """.stripMargin > {code} > report errors: > {code:java} > Exception in thread "main" > org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: > TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at >
[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152617#comment-17152617 ] mzz commented on FLINK-18511: - [~jark] my create table SQL: {code:java} val createTableSql = """ |CREATE TABLE aggs_test( | data_date VARCHAR, | data_min VARCHAR, | data_hour VARCHAR, | server_timestamp BIGINT, | access VARCHAR, | appversion VARCHAR |) |WITH( | 'connector.type' ='kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'xxx', | 'connector.startup-mode' = 'earliest-offset', | 'connector.properties.0.key' = 'bootstrap.servers', | 'connector.properties.0.value' = '', | 'update-mode' = 'append', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |) """.stripMargin {code} > Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > -- > > Key: FLINK-18511 > URL: https://issues.apache.org/jira/browse/FLINK-18511 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > I use Schema like this : > {code:java} > val schema = new Schema() > .field("rowtimes", DataTypes.TIME(3)).rowtime(new > Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000)) > {code} > sql: > {code:java} > val groupSql = > """ > |select access, > |count(access), > |TUMBLE_END(rowtimes,INTERVAL '10' second) > |from log_test_table > |group by > |access,TUMBLE_END(rowtimes,INTERVAL '10' second) > """.stripMargin > {code} > report errors: > {code:java} > Exception in thread "main" > org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: > TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at >
[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152615#comment-17152615 ] mzz commented on FLINK-18511: - [~jark] THX Jark Wu. I tried to use DDL to register the table,but the error always exit. My SQL is so simple: {code:java} val groupSql = """ |select access, |count(access), |TUMBLE_END(CAST(server_timestamp as TIMESTAMP(3)),INTERVAL '10' second) |from aggs_test |group by |access,TUMBLE_END(CAST(server_timestamp as TIMESTAMP(3)),INTERVAL '10' second) """.stripMargin {code} error: {code:java} Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Unsupported call: TUMBLE_END If you think this function should be supported, you can create an issue and start a discussion for it. {code} > Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > -- > > Key: FLINK-18511 > URL: https://issues.apache.org/jira/browse/FLINK-18511 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > I use Schema like this : > {code:java} > val schema = new Schema() > .field("rowtimes", DataTypes.TIME(3)).rowtime(new > Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000)) > {code} > sql: > {code:java} > val groupSql = > """ > |select access, > |count(access), > |TUMBLE_END(rowtimes,INTERVAL '10' second) > |from log_test_table > |group by > |access,TUMBLE_END(rowtimes,INTERVAL '10' second) > """.stripMargin > {code} > report errors: > {code:java} > Exception in thread "main" > org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: > TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at >
[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152544#comment-17152544 ] mzz commented on FLINK-18511: - [~jark] thx for your reply,but I change shema.filed as : {code:java} .field("rowtimes", DataTypes.TIMESTAMP(3)).rowtime(new Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000)) {code} it always report error: {code:java} Exception in thread "main" org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: TUMBLE_END(TIMESTAMP(3), INTERVAL SECOND(3) NOT NULL) If you think this function should be supported, you can create an issue and start a discussion for it. {code} > Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > -- > > Key: FLINK-18511 > URL: https://issues.apache.org/jira/browse/FLINK-18511 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: mzz >Priority: Critical > > I use Schema like this : > {code:java} > val schema = new Schema() > .field("rowtimes", DataTypes.TIME(3)).rowtime(new > Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000)) > {code} > sql: > {code:java} > val groupSql = > """ > |select access, > |count(access), > |TUMBLE_END(rowtimes,INTERVAL '10' second) > |from log_test_table > |group by > |access,TUMBLE_END(rowtimes,INTERVAL '10' second) > """.stripMargin > {code} > report errors: > {code:java} > Exception in thread "main" > org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: > TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) > If you think this function should be supported, you can create an issue and > start a discussion for it. > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179) > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) > at >
[jira] [Created] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
mzz created FLINK-18511: --- Summary: Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) Key: FLINK-18511 URL: https://issues.apache.org/jira/browse/FLINK-18511 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.10.0 Reporter: mzz I use Schema like this : {code:java} val schema = new Schema() .field("rowtimes", DataTypes.TIME(3)).rowtime(new Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000)) {code} sql: {code:java} val groupSql = """ |select access, |count(access), |TUMBLE_END(rowtimes,INTERVAL '10' second) |from log_test_table |group by |access,TUMBLE_END(rowtimes,INTERVAL '10' second) """.stripMargin {code} report errors: {code:java} Exception in thread "main" org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) If you think this function should be supported, you can create an issue and start a discussion for it. at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792) at scala.Option.getOrElse(Option.scala:121) at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791) at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796) at scala.Option.getOrElse(Option.scala:121) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51) at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:139) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at
[jira] [Commented] (FLINK-18443) The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters lengt
[ https://issues.apache.org/jira/browse/FLINK-18443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147494#comment-17147494 ] mzz commented on FLINK-18443: - I tried to read the source code,But it didn't help me。Because of the parameter METRICS_OPERATOR_NAME_MAX_LENGTH is final。 {code:java} static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80; public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) { if (name != null && name.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) { LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH); name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH); } OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, operatorID, name); // unique OperatorIDs only exist in streaming, so we have to rely on the name for batch operators final String key = operatorID + name; synchronized (this) { OperatorMetricGroup previous = operators.put(key, operator); if (previous == null) { // no operator group so far return operator; } else { // already had an operator group. restore that one. operators.put(key, previous); return previous; } } } {code} > The operator name select: (ip, ts, count, environment.access AS access, > environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the > 80 characters length limit and was truncated > > > Key: FLINK-18443 > URL: https://issues.apache.org/jira/browse/FLINK-18443 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.0 >Reporter: mzz >Priority: Major > > *Schema:* > {code:java} > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("procTime", Types.SQL_TIMESTAMP).proctime() > .field("environment", schemaEnvironment) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, > Types.ROW(Array("count","sid", "eventid","params"), > Array[TypeInformation[_]](Types.STRING(),Types.STRING(), > > Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]] > (Types.STRING(),Types.STRING(),Types.STRING())) > ) > {code} > *when execute this sql*: > {code:java} > val sql = > """ > |SELECT > |ip, > |ts, > |params.ad, > |params.adtype, > |eventid, > |procTime > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params) > |""".stripMargin > {code} > *I got a warning,and the console keeps brushing this warning,no normal > printout* > {code:java} > 09:38:38,694 WARN org.apache.flink.metrics.MetricGroup >- The operator name correlate: table(explode($cor0.advs)), select: ip, ts, > procTime, advs, sid, eventid, params exceeded the 80 characters length limit > and was truncated. > {code} > *But after I change it to this way, although I occasionally brush this Warn, > it can be output normally。I change the 'params' type from Types.ROW to > Types.STRING*。 > {code:java} > .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, > Types.ROW(Array("count", "sid", "eventid", "params"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING(), > Types.STRING(), Types.STRING() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18437) org.apache.calcite.sql.validate.SqlValidatorException: List of column aliases must have same degree as table
[ https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147492#comment-17147492 ] mzz commented on FLINK-18437: - *I tried to read the source code,but it didn't help me。beacuse of the METRICS_OPERATOR_NAME_MAX_LENGTH is final* {code:java} static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80; public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) { if (name != null && name.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) { LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH); name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH); } OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, operatorID, name); // unique OperatorIDs only exist in streaming, so we have to rely on the name for batch operators final String key = operatorID + name; synchronized (this) { OperatorMetricGroup previous = operators.put(key, operator); if (previous == null) { // no operator group so far return operator; } else { // already had an operator group. restore that one. operators.put(key, previous); return previous; } } } {code} > org.apache.calcite.sql.validate.SqlValidatorException: List of column aliases > must have same degree as table > > > Key: FLINK-18437 > URL: https://issues.apache.org/jira/browse/FLINK-18437 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.3 >Reporter: mzz >Priority: Critical > > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("environment", Types.ROW(Array("access", "brand"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING))) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING > ) > .inAppendMode() > .registerTableSource("aggs_test") > The code above is dataSchema,i tried this way > [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but > when i execute this sql: > val sql1 = > """ > |SELECT > |ip, > |ts, > |environment, > |adv > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (adv) > |""".stripMargin > It report an error: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 8, column 31 to line 8, column 33: List of > column aliases must have same degree as table; table has 2 columns ('access', > 'brand'), whereas alias list has 1 columns > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) > at > org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, > column 31 to line 8, column 33: List of column aliases must have same degree > as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 > columns > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > at >
[jira] [Created] (FLINK-18443) The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters length
mzz created FLINK-18443: --- Summary: The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters length limit and was truncated Key: FLINK-18443 URL: https://issues.apache.org/jira/browse/FLINK-18443 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.9.0 Reporter: mzz *Schema:* {code:java} .withSchema(new Schema() .field("ip", Types.STRING()) .field("ts", Types.STRING()) .field("procTime", Types.SQL_TIMESTAMP).proctime() .field("environment", schemaEnvironment) .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, Types.ROW(Array("count","sid", "eventid","params"), Array[TypeInformation[_]](Types.STRING(),Types.STRING(), Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]] (Types.STRING(),Types.STRING(),Types.STRING())) ) {code} *when execute this sql*: {code:java} val sql = """ |SELECT |ip, |ts, |params.ad, |params.adtype, |eventid, |procTime |FROM aggs_test |CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params) |""".stripMargin {code} *I got a warning,and the console keeps brushing this warning,no normal printout* {code:java} 09:38:38,694 WARN org.apache.flink.metrics.MetricGroup - The operator name correlate: table(explode($cor0.advs)), select: ip, ts, procTime, advs, sid, eventid, params exceeded the 80 characters length limit and was truncated. {code} *But after I change it to this way, although I occasionally brush this Warn, it can be output normally。I change the 'params' type from Types.ROW to Types.STRING*。 {code:java} .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, Types.ROW(Array("count", "sid", "eventid", "params"), Array[TypeInformation[_]](Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING() {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-18437) org.apache.calcite.sql.validate.SqlValidatorException: List of column aliases must have same degree as table
[ https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz updated FLINK-18437: Summary: org.apache.calcite.sql.validate.SqlValidatorException: List of column aliases must have same degree as table (was: Error message is not correct when using UNNEST) > org.apache.calcite.sql.validate.SqlValidatorException: List of column aliases > must have same degree as table > > > Key: FLINK-18437 > URL: https://issues.apache.org/jira/browse/FLINK-18437 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.3 >Reporter: mzz >Priority: Critical > > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("environment", Types.ROW(Array("access", "brand"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING))) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING > ) > .inAppendMode() > .registerTableSource("aggs_test") > The code above is dataSchema,i tried this way > [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but > when i execute this sql: > val sql1 = > """ > |SELECT > |ip, > |ts, > |environment, > |adv > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (adv) > |""".stripMargin > It report an error: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 8, column 31 to line 8, column 33: List of > column aliases must have same degree as table; table has 2 columns ('access', > 'brand'), whereas alias list has 1 columns > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) > at > org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, > column 31 to line 8, column 33: List of column aliases must have same degree > as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 > columns > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807) > at > org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) > at > org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) > at > org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101) > at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191) > at > org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291) > at >
[jira] [Comment Edited] (FLINK-18437) Error message is not correct when using UNNEST
[ https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147193#comment-17147193 ] mzz edited comment on FLINK-18437 at 6/28/20, 9:16 AM: --- *Schema:* .withSchema(new Schema() .field("ip", Types.STRING()) .field("ts", Types.STRING()) .field("environment", schemaEnvironment) .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, Types.ROW(Array("count","sid", "eventid","params"), Array[TypeInformation[_]](Types.STRING(),Types.STRING(), Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]](Types.STRING(),Types.STRING(),Types.STRING())) ) when execute this sql: {code:java} val sql = """ |SELECT |ip, |ts, |`count`, |environment.access, |environment.brand, |sid, |params.adid, |eventid |FROM aggs_test |CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params) |""".stripMargin {code} *log:* 11:31:54,379 WARN org.apache.flink.metrics.MetricGroup - The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters length limit and was truncated. *The console keeps brushing this warning,and caused fail to output normally,Which parameter should I modify。THX。* was (Author: mzz_q): *Schema:* .withSchema(new Schema() .field("ip", Types.STRING()) .field("ts", Types.STRING()) .field("environment", schemaEnvironment) .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, Types.ROW(Array("count","sid", "eventid","params"), Array[TypeInformation[_]](Types.STRING(),Types.STRING(), Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]](Types.STRING(),Types.STRING(),Types.STRING())) ) when execute this sql: {code:java} val sql = """ |SELECT |ip, |ts, |`count`, |environment.access, |environment.brand, |sid, |params.adid, |eventid |FROM aggs_test |CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params) |""".stripMargin {code} *log:* 11:31:54,379 WARN org.apache.flink.metrics.MetricGroup - The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters length limit and was truncated. *This warning caused fail to output normally,Which parameter should I modify。THX。* > Error message is not correct when using UNNEST > -- > > Key: FLINK-18437 > URL: https://issues.apache.org/jira/browse/FLINK-18437 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.9.3 >Reporter: mzz >Priority: Critical > > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("environment", Types.ROW(Array("access", "brand"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING))) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING > ) > .inAppendMode() > .registerTableSource("aggs_test") > The code above is dataSchema,i tried this way > [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but > when i execute this sql: > val sql1 = > """ > |SELECT > |ip, > |ts, > |environment, > |adv > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (adv) > |""".stripMargin > It report an error: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 8, column 31 to line 8, column 33: List of > column aliases must have same degree as table; table has 2 columns ('access', > 'brand'), whereas alias list has 1 columns > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) > at > org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) >
[jira] [Updated] (FLINK-18437) Error message is not correct when using UNNEST
[ https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz updated FLINK-18437: Issue Type: Bug (was: Improvement) > Error message is not correct when using UNNEST > -- > > Key: FLINK-18437 > URL: https://issues.apache.org/jira/browse/FLINK-18437 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.3 >Reporter: mzz >Priority: Critical > > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("environment", Types.ROW(Array("access", "brand"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING))) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING > ) > .inAppendMode() > .registerTableSource("aggs_test") > The code above is dataSchema,i tried this way > [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but > when i execute this sql: > val sql1 = > """ > |SELECT > |ip, > |ts, > |environment, > |adv > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (adv) > |""".stripMargin > It report an error: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 8, column 31 to line 8, column 33: List of > column aliases must have same degree as table; table has 2 columns ('access', > 'brand'), whereas alias list has 1 columns > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) > at > org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, > column 31 to line 8, column 33: List of column aliases must have same degree > as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 > columns > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807) > at > org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) > at > org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) > at > org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101) > at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191) > at > org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108) > at >
[jira] [Comment Edited] (FLINK-18437) Error message is not correct when using UNNEST
[ https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147193#comment-17147193 ] mzz edited comment on FLINK-18437 at 6/28/20, 9:11 AM: --- *Schema:* .withSchema(new Schema() .field("ip", Types.STRING()) .field("ts", Types.STRING()) .field("environment", schemaEnvironment) .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, Types.ROW(Array("count","sid", "eventid","params"), Array[TypeInformation[_]](Types.STRING(),Types.STRING(), Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]](Types.STRING(),Types.STRING(),Types.STRING())) ) when execute this sql: {code:java} val sql = """ |SELECT |ip, |ts, |`count`, |environment.access, |environment.brand, |sid, |params.adid, |eventid |FROM aggs_test |CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params) |""".stripMargin {code} *log:* 11:31:54,379 WARN org.apache.flink.metrics.MetricGroup - The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters length limit and was truncated. *This warning caused fail to output normally,Which parameter should I modify。THX。* was (Author: mzz_q): *Schema:* .withSchema(new Schema() .field("ip", Types.STRING()) .field("ts", Types.STRING()) .field("environment", schemaEnvironment) .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, Types.ROW(Array("count","sid", "eventid","params"), Array[TypeInformation[_]](Types.STRING(),Types.STRING(), Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]](Types.STRING(),Types.STRING(),Types.STRING())) ) when execute this sql: {code:java} val sql = """ |SELECT |ip, |ts, |`count`, |environment.access, |environment.brand, |sid, |params.adid, |eventid |FROM aggs_test |CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params) |""".stripMargin {code} *log:* 11:31:54,379 WARN org.apache.flink.metrics.MetricGroup - The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters length limit and was truncated. *Which parameter should I modify。THX。 * > Error message is not correct when using UNNEST > -- > > Key: FLINK-18437 > URL: https://issues.apache.org/jira/browse/FLINK-18437 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.9.3 >Reporter: mzz >Priority: Critical > > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("environment", Types.ROW(Array("access", "brand"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING))) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING > ) > .inAppendMode() > .registerTableSource("aggs_test") > The code above is dataSchema,i tried this way > [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but > when i execute this sql: > val sql1 = > """ > |SELECT > |ip, > |ts, > |environment, > |adv > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (adv) > |""".stripMargin > It report an error: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 8, column 31 to line 8, column 33: List of > column aliases must have same degree as table; table has 2 columns ('access', > 'brand'), whereas alias list has 1 columns > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) > at > org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8,
[jira] [Updated] (FLINK-18437) Error message is not correct when using UNNEST
[ https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz updated FLINK-18437: Priority: Critical (was: Major) > Error message is not correct when using UNNEST > -- > > Key: FLINK-18437 > URL: https://issues.apache.org/jira/browse/FLINK-18437 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.9.3 >Reporter: mzz >Priority: Critical > > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("environment", Types.ROW(Array("access", "brand"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING))) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING > ) > .inAppendMode() > .registerTableSource("aggs_test") > The code above is dataSchema,i tried this way > [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but > when i execute this sql: > val sql1 = > """ > |SELECT > |ip, > |ts, > |environment, > |adv > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (adv) > |""".stripMargin > It report an error: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 8, column 31 to line 8, column 33: List of > column aliases must have same degree as table; table has 2 columns ('access', > 'brand'), whereas alias list has 1 columns > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) > at > org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, > column 31 to line 8, column 33: List of column aliases must have same degree > as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 > columns > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807) > at > org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) > at > org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) > at > org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101) > at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191) > at > org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108) > at >
[jira] [Commented] (FLINK-18437) Error message is not correct when using UNNEST
[ https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147193#comment-17147193 ] mzz commented on FLINK-18437: - *Schema:* .withSchema(new Schema() .field("ip", Types.STRING()) .field("ts", Types.STRING()) .field("environment", schemaEnvironment) .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, Types.ROW(Array("count","sid", "eventid","params"), Array[TypeInformation[_]](Types.STRING(),Types.STRING(), Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]](Types.STRING(),Types.STRING(),Types.STRING())) ) when execute this sql: {code:java} val sql = """ |SELECT |ip, |ts, |`count`, |environment.access, |environment.brand, |sid, |params.adid, |eventid |FROM aggs_test |CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params) |""".stripMargin {code} *log:* 11:31:54,379 WARN org.apache.flink.metrics.MetricGroup - The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters length limit and was truncated. *Which parameter should I modify。THX。 * > Error message is not correct when using UNNEST > -- > > Key: FLINK-18437 > URL: https://issues.apache.org/jira/browse/FLINK-18437 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.9.3 >Reporter: mzz >Priority: Major > > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("environment", Types.ROW(Array("access", "brand"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING))) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING > ) > .inAppendMode() > .registerTableSource("aggs_test") > The code above is dataSchema,i tried this way > [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but > when i execute this sql: > val sql1 = > """ > |SELECT > |ip, > |ts, > |environment, > |adv > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (adv) > |""".stripMargin > It report an error: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 8, column 31 to line 8, column 33: List of > column aliases must have same degree as table; table has 2 columns ('access', > 'brand'), whereas alias list has 1 columns > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) > at > org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, > column 31 to line 8, column 33: List of column aliases must have same degree > as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 > columns > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807) > at > org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) > at >
[jira] [Updated] (FLINK-18437) Error message is not correct when using UNNEST
[ https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz updated FLINK-18437: Priority: Major (was: Minor) > Error message is not correct when using UNNEST > -- > > Key: FLINK-18437 > URL: https://issues.apache.org/jira/browse/FLINK-18437 > Project: Flink > Issue Type: Test > Components: API / DataStream >Affects Versions: 1.9.3 >Reporter: mzz >Priority: Major > > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("environment", Types.ROW(Array("access", "brand"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING))) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING > ) > .inAppendMode() > .registerTableSource("aggs_test") > The code above is dataSchema,i tried this way > [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but > when i execute this sql: > val sql1 = > """ > |SELECT > |ip, > |ts, > |environment, > |adv > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (adv) > |""".stripMargin > It report an error: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 8, column 31 to line 8, column 33: List of > column aliases must have same degree as table; table has 2 columns ('access', > 'brand'), whereas alias list has 1 columns > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) > at > org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, > column 31 to line 8, column 33: List of column aliases must have same degree > as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 > columns > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807) > at > org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) > at > org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) > at > org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101) > at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191) > at > org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3093) >
[jira] [Updated] (FLINK-18437) Error message is not correct when using UNNEST
[ https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz updated FLINK-18437: Issue Type: Improvement (was: Test) > Error message is not correct when using UNNEST > -- > > Key: FLINK-18437 > URL: https://issues.apache.org/jira/browse/FLINK-18437 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.9.3 >Reporter: mzz >Priority: Major > > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("environment", Types.ROW(Array("access", "brand"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING))) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING > ) > .inAppendMode() > .registerTableSource("aggs_test") > The code above is dataSchema,i tried this way > [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but > when i execute this sql: > val sql1 = > """ > |SELECT > |ip, > |ts, > |environment, > |adv > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (adv) > |""".stripMargin > It report an error: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 8, column 31 to line 8, column 33: List of > column aliases must have same degree as table; table has 2 columns ('access', > 'brand'), whereas alias list has 1 columns > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) > at > org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, > column 31 to line 8, column 33: List of column aliases must have same degree > as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 > columns > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807) > at > org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) > at > org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) > at > org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101) > at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191) > at > org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108) > at >
[jira] [Updated] (FLINK-18437) Table API has no Functions like sparkSQL explode
[ https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz updated FLINK-18437: Issue Type: Test (was: Bug) > Table API has no Functions like sparkSQL explode > - > > Key: FLINK-18437 > URL: https://issues.apache.org/jira/browse/FLINK-18437 > Project: Flink > Issue Type: Test > Components: API / DataStream >Affects Versions: 1.9.3 >Reporter: mzz >Priority: Minor > > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("environment", Types.ROW(Array("access", "brand"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING))) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING > ) > .inAppendMode() > .registerTableSource("aggs_test") > The code above is dataSchema,i tried this way > [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but > when i execute this sql: > val sql1 = > """ > |SELECT > |ip, > |ts, > |environment, > |adv > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (adv) > |""".stripMargin > It report an error: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 8, column 31 to line 8, column 33: List of > column aliases must have same degree as table; table has 2 columns ('access', > 'brand'), whereas alias list has 1 columns > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) > at > org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, > column 31 to line 8, column 33: List of column aliases must have same degree > as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 > columns > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807) > at > org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) > at > org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) > at > org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101) > at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191) > at > org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108) > at >
[jira] [Commented] (FLINK-18437) Table API has no Functions like sparkSQL explode
[ https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147162#comment-17147162 ] mzz commented on FLINK-18437: - [~jark] Thank you for your advice,Problem has solved。 This error log made me to go the wrong direction。`environment`('access', 'brand') is a json object,`advs `(["count","eventid"]) is a json array,error log is "table has 2 columns ('access', 'brand'), whereas alias list has 1 columns"。 > Table API has no Functions like sparkSQL explode > - > > Key: FLINK-18437 > URL: https://issues.apache.org/jira/browse/FLINK-18437 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.3 >Reporter: mzz >Priority: Major > > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("environment", Types.ROW(Array("access", "brand"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING))) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING > ) > .inAppendMode() > .registerTableSource("aggs_test") > The code above is dataSchema,i tried this way > [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but > when i execute this sql: > val sql1 = > """ > |SELECT > |ip, > |ts, > |environment, > |adv > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (adv) > |""".stripMargin > It report an error: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 8, column 31 to line 8, column 33: List of > column aliases must have same degree as table; table has 2 columns ('access', > 'brand'), whereas alias list has 1 columns > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) > at > org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, > column 31 to line 8, column 33: List of column aliases must have same degree > as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 > columns > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807) > at > org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) > at > org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) > at > org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101) > at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191) > at > org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291) >
[jira] [Updated] (FLINK-18437) Table API has no Functions like sparkSQL explode
[ https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz updated FLINK-18437: Priority: Minor (was: Major) > Table API has no Functions like sparkSQL explode > - > > Key: FLINK-18437 > URL: https://issues.apache.org/jira/browse/FLINK-18437 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.3 >Reporter: mzz >Priority: Minor > > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("environment", Types.ROW(Array("access", "brand"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING))) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING > ) > .inAppendMode() > .registerTableSource("aggs_test") > The code above is dataSchema,i tried this way > [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but > when i execute this sql: > val sql1 = > """ > |SELECT > |ip, > |ts, > |environment, > |adv > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (adv) > |""".stripMargin > It report an error: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 8, column 31 to line 8, column 33: List of > column aliases must have same degree as table; table has 2 columns ('access', > 'brand'), whereas alias list has 1 columns > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) > at > org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, > column 31 to line 8, column 33: List of column aliases must have same degree > as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 > columns > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807) > at > org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) > at > org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) > at > org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101) > at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191) > at > org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108) > at >
[jira] [Commented] (FLINK-18437) Table API has no Functions like sparkSQL explode
[ https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147148#comment-17147148 ] mzz commented on FLINK-18437: - execute :select * from table,and print schema: root |-- ip: STRING |-- ts: STRING |-- environment: ROW<`access` STRING, `brand` STRING> |-- advs: ARRAY> I want to split the array into multiple rows,thx > Table API has no Functions like sparkSQL explode > - > > Key: FLINK-18437 > URL: https://issues.apache.org/jira/browse/FLINK-18437 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.9.3 >Reporter: mzz >Priority: Major > > .withSchema(new Schema() > .field("ip", Types.STRING()) > .field("ts", Types.STRING()) > .field("environment", Types.ROW(Array("access", "brand"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING))) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING > ) > .inAppendMode() > .registerTableSource("aggs_test") > The code above is dataSchema,i tried this way > [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but > when i execute this sql: > val sql1 = > """ > |SELECT > |ip, > |ts, > |environment, > |adv > |FROM aggs_test > |CROSS JOIN UNNEST(advs) AS t (adv) > |""".stripMargin > It report an error: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 8, column 31 to line 8, column 33: List of > column aliases must have same degree as table; table has 2 columns ('access', > 'brand'), whereas alias list has 1 columns > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) > at > org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) > at > QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, > column 31 to line 8, column 33: List of column aliases must have same degree > as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 > columns > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807) > at > org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) > at > org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) > at > org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101) > at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191) > at > org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699) > at > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684) > at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291) > at >
[jira] [Created] (FLINK-18437) Table API has no Functions like sparkSQL explode
mzz created FLINK-18437: --- Summary: Table API has no Functions like sparkSQL explode Key: FLINK-18437 URL: https://issues.apache.org/jira/browse/FLINK-18437 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.9.3 Reporter: mzz .withSchema(new Schema() .field("ip", Types.STRING()) .field("ts", Types.STRING()) .field("environment", Types.ROW(Array("access", "brand"), Array[TypeInformation[_]](Types.STRING(), Types.STRING))) .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), Array[TypeInformation[_]](Types.STRING(), Types.STRING ) .inAppendMode() .registerTableSource("aggs_test") The code above is dataSchema,i tried this way [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but when i execute this sql: val sql1 = """ |SELECT |ip, |ts, |environment, |adv |FROM aggs_test |CROSS JOIN UNNEST(advs) AS t (adv) |""".stripMargin It report an error: Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 8, column 31 to line 8, column 33: List of column aliases must have same degree as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 columns at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128) at org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83) at org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298) at QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88) at QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, column 31 to line 8, column 33: List of column aliases must have same degree as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 columns at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807) at org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) at org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115) at org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41) at org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101) at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191) at org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3093) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3158) at org.apache.flink.table.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.scala:67) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3102) at
[jira] [Commented] (FLINK-18230) Table API has no Functions like sparkSQL explode
[ https://issues.apache.org/jira/browse/FLINK-18230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17130097#comment-17130097 ] mzz commented on FLINK-18230: - As for my SQL, my idea is to flattening nest Array like sparksql.explode function SparkSQL.eg: "select ip, ts,launchs,adv.`count` from aggs_test LATERAL VIEW explode(advs) a AS adv" > Table API has no Functions like sparkSQL explode > - > > Key: FLINK-18230 > URL: https://issues.apache.org/jira/browse/FLINK-18230 > Project: Flink > Issue Type: Improvement >Reporter: mzz >Priority: Major > > streamTableEnvironment.connect(new Kafka() > .topic(TOPIC) > .version(VERSION) > .startFromEarliest() > .property("bootstrap.servers", "172.16.30.207:9092") > .property("group.id", "km_aggs_group_3") > ) > .withFormat( > new Json() > .failOnMissingField(true) > .deriveSchema() > ) > .withSchema(new Schema() > .field("devs", Types.STRING()) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, > Types.ROW(Array("count", "eventid", "sid", "params"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING, Types.STRING, > Types.ROW(Array("adid", "adtype", "ecpm"), > Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING > )) > .field("identity", Types.STRING()) > .field("ip", Types.STRING()) > .field("launchs", Types.STRING()) > .field("ts", Types.STRING()) > ) > .inAppendMode() > .registerTableSource("aggs_test") > val tableResult = streamTableEnvironment.sqlQuery("select ip, > ts,launchs,advs[1].`count` from aggs_test") > tableResult.printSchema() > streamTableEnvironment.toAppendStream[Row](tableResult).print() -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-18230) Table API has no Functions like sparkSQL explode
[ https://issues.apache.org/jira/browse/FLINK-18230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz updated FLINK-18230: Issue Type: Improvement (was: Bug) > Table API has no Functions like sparkSQL explode > - > > Key: FLINK-18230 > URL: https://issues.apache.org/jira/browse/FLINK-18230 > Project: Flink > Issue Type: Improvement >Reporter: mzz >Priority: Major > > streamTableEnvironment.connect(new Kafka() > .topic(TOPIC) > .version(VERSION) > .startFromEarliest() > .property("bootstrap.servers", "172.16.30.207:9092") > .property("group.id", "km_aggs_group_3") > ) > .withFormat( > new Json() > .failOnMissingField(true) > .deriveSchema() > ) > .withSchema(new Schema() > .field("devs", Types.STRING()) > .field("advs", ObjectArrayTypeInfo.getInfoFor(new > Array[Row](0).getClass, > Types.ROW(Array("count", "eventid", "sid", "params"), > Array[TypeInformation[_]](Types.STRING(), Types.STRING, Types.STRING, > Types.ROW(Array("adid", "adtype", "ecpm"), > Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING > )) > .field("identity", Types.STRING()) > .field("ip", Types.STRING()) > .field("launchs", Types.STRING()) > .field("ts", Types.STRING()) > ) > .inAppendMode() > .registerTableSource("aggs_test") > val tableResult = streamTableEnvironment.sqlQuery("select ip, > ts,launchs,advs[1].`count` from aggs_test") > tableResult.printSchema() > streamTableEnvironment.toAppendStream[Row](tableResult).print() -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18230) Table API has no Functions like sparkSQL explode
mzz created FLINK-18230: --- Summary: Table API has no Functions like sparkSQL explode Key: FLINK-18230 URL: https://issues.apache.org/jira/browse/FLINK-18230 Project: Flink Issue Type: Bug Reporter: mzz streamTableEnvironment.connect(new Kafka() .topic(TOPIC) .version(VERSION) .startFromEarliest() .property("bootstrap.servers", "172.16.30.207:9092") .property("group.id", "km_aggs_group_3") ) .withFormat( new Json() .failOnMissingField(true) .deriveSchema() ) .withSchema(new Schema() .field("devs", Types.STRING()) .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, Types.ROW(Array("count", "eventid", "sid", "params"), Array[TypeInformation[_]](Types.STRING(), Types.STRING, Types.STRING, Types.ROW(Array("adid", "adtype", "ecpm"), Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING )) .field("identity", Types.STRING()) .field("ip", Types.STRING()) .field("launchs", Types.STRING()) .field("ts", Types.STRING()) ) .inAppendMode() .registerTableSource("aggs_test") val tableResult = streamTableEnvironment.sqlQuery("select ip, ts,launchs,advs[1].`count` from aggs_test") tableResult.printSchema() streamTableEnvironment.toAppendStream[Row](tableResult).print() -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-18184) Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory'
[ https://issues.apache.org/jira/browse/FLINK-18184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mzz updated FLINK-18184: Priority: Major (was: Critical) > Could not find a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' > - > > Key: FLINK-18184 > URL: https://issues.apache.org/jira/browse/FLINK-18184 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.9.1 > Environment: local:macos > flink1.9 > >Reporter: mzz >Priority: Major > > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.enableCheckpointing(5000) // checkpoint every 5000 msecs > //kafak配置 > val properties = new Properties() > properties.setProperty("bootstrap.servers", "172.16.30.207:9092") > properties.setProperty("group.id", "km_aggs_group") > val fsSettings = > EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() > val kafkaConsumer = new FlinkKafkaConsumer[String](TOPIC, new > SimpleStringSchema(), properties).setStartFromEarliest() > //val source = env.addSource(kafkaConsumer) > val streamTableEnvironment = StreamTableEnvironment.create(env,fsSettings) > streamTableEnvironment.connect(new Kafka() > .topic(TOPIC) > .version(VERSION) > .startFromEarliest() > .property("bootstrap.servers", "172.16.30.207:9092") > .property("zookeeper.connect", "172.16.30.207:2181") > .property("group.id", "km_aggs_group_table") > // .properties(properties) > ) > .withFormat( > new Json() > .failOnMissingField(true) > .deriveSchema() > ) > .withSchema(new Schema() > .field("advs", Types.STRING()) > .field("devs", Types.STRING()) > .field("environment", Types.STRING()) > .field("events", Types.STRING()) > .field("identity", Types.STRING()) > .field("ip", Types.STRING()) > .field("launchs", Types.STRING()) > .field("ts", Types.STRING()) > ) > .inAppendMode() > .registerTableSource("aggs_test") > val tableResult = streamTableEnvironment.sqlQuery("select * from > aggs_test") > tableResult.printSchema() > //streamTableEnvironment.toAppendStream[Row](tableResult).print() > //启动程序 > env.execute("test_kafka") > > erroe message: > Exception in thread "main" org.apache.flink.table.api.TableException: > findAndCreateTableSource failed. > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) > at > org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) > at > KM.COM.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:70) > at > KM.COM.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could > not find a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > Reason: No context matches. > The following properties are requested: > connector.properties.0.key=zookeeper.connect > connector.properties.0.value=172.16.30.207:2181 > connector.properties.1.key=group.id > connector.properties.1.value=km_aggs_group_table > connector.properties.2.key=bootstrap.servers > connector.properties.2.value=172.16.30.207:9092 > connector.property-version=1 > connector.startup-mode=earliest-offset > connector.topic=aggs_topic > connector.type=kafka > connector.version=2.0 > format.derive-schema=true > format.fail-on-missing-field=true > format.property-version=1 > format.type=json > schema.0.name=advs > schema.0.type=VARCHAR > schema.1.name=devs > schema.1.type=VARCHAR > schema.2.name=environment > schema.2.type=VARCHAR > schema.3.name=events > schema.3.type=VARCHAR > schema.4.name=identity > schema.4.type=VARCHAR > schema.5.name=ip > schema.5.type=VARCHAR > schema.6.name=launchs > schema.6.type=VARCHAR > schema.7.name=ts > schema.7.type=VARCHAR > update-mode=append > The following factories have been considered: > org.apache.flink.formats.json.JsonRowFormatFactory > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > org.apache.flink.table.planner.StreamPlannerFactory > org.apache.flink.table.executor.StreamExecutorFactory > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > org.apache.flink.table.sinks.CsvBatchTableSinkFactory >
[jira] [Created] (FLINK-18184) Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory'
mzz created FLINK-18184: --- Summary: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' Key: FLINK-18184 URL: https://issues.apache.org/jira/browse/FLINK-18184 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.9.1 Environment: local:macos flink1.9 Reporter: mzz val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000) // checkpoint every 5000 msecs //kafak配置 val properties = new Properties() properties.setProperty("bootstrap.servers", "172.16.30.207:9092") properties.setProperty("group.id", "km_aggs_group") val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() val kafkaConsumer = new FlinkKafkaConsumer[String](TOPIC, new SimpleStringSchema(), properties).setStartFromEarliest() //val source = env.addSource(kafkaConsumer) val streamTableEnvironment = StreamTableEnvironment.create(env,fsSettings) streamTableEnvironment.connect(new Kafka() .topic(TOPIC) .version(VERSION) .startFromEarliest() .property("bootstrap.servers", "172.16.30.207:9092") .property("zookeeper.connect", "172.16.30.207:2181") .property("group.id", "km_aggs_group_table") // .properties(properties) ) .withFormat( new Json() .failOnMissingField(true) .deriveSchema() ) .withSchema(new Schema() .field("advs", Types.STRING()) .field("devs", Types.STRING()) .field("environment", Types.STRING()) .field("events", Types.STRING()) .field("identity", Types.STRING()) .field("ip", Types.STRING()) .field("launchs", Types.STRING()) .field("ts", Types.STRING()) ) .inAppendMode() .registerTableSource("aggs_test") val tableResult = streamTableEnvironment.sqlQuery("select * from aggs_test") tableResult.printSchema() //streamTableEnvironment.toAppendStream[Row](tableResult).print() //启动程序 env.execute("test_kafka") erroe message: Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed. at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54) at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69) at KM.COM.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:70) at KM.COM.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: No context matches. The following properties are requested: connector.properties.0.key=zookeeper.connect connector.properties.0.value=172.16.30.207:2181 connector.properties.1.key=group.id connector.properties.1.value=km_aggs_group_table connector.properties.2.key=bootstrap.servers connector.properties.2.value=172.16.30.207:9092 connector.property-version=1 connector.startup-mode=earliest-offset connector.topic=aggs_topic connector.type=kafka connector.version=2.0 format.derive-schema=true format.fail-on-missing-field=true format.property-version=1 format.type=json schema.0.name=advs schema.0.type=VARCHAR schema.1.name=devs schema.1.type=VARCHAR schema.2.name=environment schema.2.type=VARCHAR schema.3.name=events schema.3.type=VARCHAR schema.4.name=identity schema.4.type=VARCHAR schema.5.name=ip schema.5.type=VARCHAR schema.6.name=launchs schema.6.type=VARCHAR schema.7.name=ts schema.7.type=VARCHAR update-mode=append The following factories have been considered: org.apache.flink.formats.json.JsonRowFormatFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.planner.StreamPlannerFactory org.apache.flink.table.executor.StreamExecutorFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory org.apache.flink.table.catalog.GenericInMemoryCatalogFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at