[jira] [Created] (FLINK-18868) Converting data into JSON format with flinksql

2020-08-10 Thread mzz (Jira)
mzz created FLINK-18868:
---

 Summary: Converting data into JSON format with flinksql
 Key: FLINK-18868
 URL: https://issues.apache.org/jira/browse/FLINK-18868
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.10.0
Reporter: mzz


Converting data into JSON format with flinksql。
like as Spark‘s Function : toJson



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18741) ProcessWindowFunction's process function exception

2020-08-05 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171876#comment-17171876
 ] 

mzz edited comment on FLINK-18741 at 8/6/20, 2:53 AM:
--

[~aljoscha] 
thx for your  reply. it doesn't throwing any exception .
When I use ProcessWindowFunction , override process function, I traverse the 
iterator in this function, the values outside are reset to their initial values。
the iterator is elements,define a variable in the process function and assign 
the initial value. Then, the iterator is traversed to give the variable + 1. 
However, when the window ends, the value of the variable will be reset to the 
initial value.

*code:*

{code:java}
  elements.foreach(e => {
if ("adreq".equals(e._3)) {
  requestNum += 1
  println(key._1, requestNum)
// The values printed here like :
//(key,1)
//(key,2)
//(key,3)
}
  })
//But print outside the for loop always like :
//(key,0)
  println(requestNum, key._1)
{code}




was (Author: mzz_q):
[~aljoscha] 
thx for your  reply.
When I use ProcessWindowFunction , override process function, I traverse the 
iterator in this function, the values outside are reset to their initial values。
the iterator is elements,define a variable in the process function and assign 
the initial value. Then, the iterator is traversed to give the variable + 1. 
However, when the window ends, the value of the variable will be reset to the 
initial value.

*code:*

{code:java}
  elements.foreach(e => {
if ("adreq".equals(e._3)) {
  requestNum += 1
  println(key._1, requestNum)
// The values printed here like :
//(key,1)
//(key,2)
//(key,3)
}
  })
//But print outside the for loop always like :
//(key,0)
  println(requestNum, key._1)
{code}



> ProcessWindowFunction's  process function exception
> ---
>
> Key: FLINK-18741
> URL: https://issues.apache.org/jira/browse/FLINK-18741
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use ProcessWindowFunction to achieve PV calculation, but when rewriting 
> process, the user-defined state value cannot be returned。
> code:
> {code:java}
> tem.keyBy(x =>
>   (x._1, x._2, x._4, x._5, x._6, x._7, x._8))
>   .timeWindow(Time.seconds(15 * 60)) //15 min window
>   .process(new ProcessWindowFunction[(String, String, String, String, 
> String, String, String, String, String), CkResult, (String, String, String, 
> String, String, String, String), TimeWindow] {
>   var clickCount: ValueState[Long] = _
> *  var requestCount: ValueState[Long] = _
> *  var returnCount: ValueState[Long] = _
>   var videoCount: ValueState[Long] = _
>   var noVideoCount: ValueState[Long] = _
>   override def open(parameters: Configuration): Unit = {
> clickCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("clickCount", classOf[Long]))
>* requestCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("requestCount", classOf[Long]))*
> returnCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("returnCount", classOf[Long]))
> videoCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("videoCount", classOf[Long]))
> noVideoCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("noVideoCount", classOf[Long]))
>   }
>   override def process(key: (String, String, String, String, String, 
> String, String), context: Context, elements: Iterable[(String, String, 
> String, String, String, String, String, String, String)], out: 
> Collector[CkResult]) = {
> try {
>   var clickNum: Long = clickCount.value
>   val dateNow = 
> LocalDateTime.now().format(DateTimeFormatter.ofPattern("MMdd")).toLong
>   var requestNum: Long = requestCount.value
>   var returnNum: Long = returnCount.value
>   var videoNum: Long = videoCount.value
>   var noVideoNum: Long = noVideoCount.value
>   if (requestNum == null) {
> requestNum = 0
>   }
>   
>   val ecpm = key._7.toDouble.formatted("%.2f").toFloat
>   val created_at = getSecondTimestampTwo(new Date)
>  
> *  elements.foreach(e => {
> if ("adreq".equals(e._3)) {
>   requestNum += 1
>   println(key._1, requestNum)
> }
>   })
>   requestCount.update(requestNum)
>   println(requestNum, key._1)*
>   
>   out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * 
> 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, 
> key._6, 

[jira] [Comment Edited] (FLINK-18741) ProcessWindowFunction's process function exception

2020-08-05 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171876#comment-17171876
 ] 

mzz edited comment on FLINK-18741 at 8/6/20, 2:52 AM:
--

[~aljoscha] 
thx for your  reply.
When I use ProcessWindowFunction , override process function, I traverse the 
iterator in this function, the values outside are reset to their initial values。
the iterator is elements,define a variable in the process function and assign 
the initial value. Then, the iterator is traversed to give the variable + 1. 
However, when the window ends, the value of the variable will be reset to the 
initial value.

*code:*

{code:java}
  elements.foreach(e => {
if ("adreq".equals(e._3)) {
  requestNum += 1
  println(key._1, requestNum)
// The values printed here like :
//(key,1)
//(key,2)
//(key,3)
}
  })
//But print outside the for loop always like :
//(key,0)
  println(requestNum, key._1)
{code}




was (Author: mzz_q):
When I use ProcessWindowFunction , override process function, I traverse the 
iterator in this function, the values outside are reset to their initial values。
the iterator is elements,define a variable in the process function and assign 
the initial value. Then, the iterator is traversed to give the variable + 1. 
However, when the window ends, the value of the variable will be reset to the 
initial value.

*code:*

{code:java}
  elements.foreach(e => {
if ("adreq".equals(e._3)) {
  requestNum += 1
  println(key._1, requestNum)
// The values printed here like :
//(key,1)
//(key,2)
//(key,3)
}
  })
//But print outside the for loop always like :
//(key,0)
  println(requestNum, key._1)
{code}



> ProcessWindowFunction's  process function exception
> ---
>
> Key: FLINK-18741
> URL: https://issues.apache.org/jira/browse/FLINK-18741
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use ProcessWindowFunction to achieve PV calculation, but when rewriting 
> process, the user-defined state value cannot be returned。
> code:
> {code:java}
> tem.keyBy(x =>
>   (x._1, x._2, x._4, x._5, x._6, x._7, x._8))
>   .timeWindow(Time.seconds(15 * 60)) //15 min window
>   .process(new ProcessWindowFunction[(String, String, String, String, 
> String, String, String, String, String), CkResult, (String, String, String, 
> String, String, String, String), TimeWindow] {
>   var clickCount: ValueState[Long] = _
> *  var requestCount: ValueState[Long] = _
> *  var returnCount: ValueState[Long] = _
>   var videoCount: ValueState[Long] = _
>   var noVideoCount: ValueState[Long] = _
>   override def open(parameters: Configuration): Unit = {
> clickCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("clickCount", classOf[Long]))
>* requestCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("requestCount", classOf[Long]))*
> returnCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("returnCount", classOf[Long]))
> videoCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("videoCount", classOf[Long]))
> noVideoCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("noVideoCount", classOf[Long]))
>   }
>   override def process(key: (String, String, String, String, String, 
> String, String), context: Context, elements: Iterable[(String, String, 
> String, String, String, String, String, String, String)], out: 
> Collector[CkResult]) = {
> try {
>   var clickNum: Long = clickCount.value
>   val dateNow = 
> LocalDateTime.now().format(DateTimeFormatter.ofPattern("MMdd")).toLong
>   var requestNum: Long = requestCount.value
>   var returnNum: Long = returnCount.value
>   var videoNum: Long = videoCount.value
>   var noVideoNum: Long = noVideoCount.value
>   if (requestNum == null) {
> requestNum = 0
>   }
>   
>   val ecpm = key._7.toDouble.formatted("%.2f").toFloat
>   val created_at = getSecondTimestampTwo(new Date)
>  
> *  elements.foreach(e => {
> if ("adreq".equals(e._3)) {
>   requestNum += 1
>   println(key._1, requestNum)
> }
>   })
>   requestCount.update(requestNum)
>   println(requestNum, key._1)*
>   
>   out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * 
> 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, 
> key._6, key._1, requestCount.value, returnCount.value, fill_rate, 
> 

[jira] [Commented] (FLINK-18741) ProcessWindowFunction's process function exception

2020-08-05 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17171876#comment-17171876
 ] 

mzz commented on FLINK-18741:
-

When I use ProcessWindowFunction , override process function, I traverse the 
iterator in this function, the values outside are reset to their initial values。
the iterator is elements,define a variable in the process function and assign 
the initial value. Then, the iterator is traversed to give the variable + 1. 
However, when the window ends, the value of the variable will be reset to the 
initial value.

*code:*

{code:java}
  elements.foreach(e => {
if ("adreq".equals(e._3)) {
  requestNum += 1
  println(key._1, requestNum)
// The values printed here like :
//(key,1)
//(key,2)
//(key,3)
}
  })
//But print outside the for loop always like :
//(key,0)
  println(requestNum, key._1)
{code}



> ProcessWindowFunction's  process function exception
> ---
>
> Key: FLINK-18741
> URL: https://issues.apache.org/jira/browse/FLINK-18741
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use ProcessWindowFunction to achieve PV calculation, but when rewriting 
> process, the user-defined state value cannot be returned。
> code:
> {code:java}
> tem.keyBy(x =>
>   (x._1, x._2, x._4, x._5, x._6, x._7, x._8))
>   .timeWindow(Time.seconds(15 * 60)) //15 min window
>   .process(new ProcessWindowFunction[(String, String, String, String, 
> String, String, String, String, String), CkResult, (String, String, String, 
> String, String, String, String), TimeWindow] {
>   var clickCount: ValueState[Long] = _
> *  var requestCount: ValueState[Long] = _
> *  var returnCount: ValueState[Long] = _
>   var videoCount: ValueState[Long] = _
>   var noVideoCount: ValueState[Long] = _
>   override def open(parameters: Configuration): Unit = {
> clickCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("clickCount", classOf[Long]))
>* requestCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("requestCount", classOf[Long]))*
> returnCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("returnCount", classOf[Long]))
> videoCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("videoCount", classOf[Long]))
> noVideoCount = getRuntimeContext.getState(new 
> ValueStateDescriptor("noVideoCount", classOf[Long]))
>   }
>   override def process(key: (String, String, String, String, String, 
> String, String), context: Context, elements: Iterable[(String, String, 
> String, String, String, String, String, String, String)], out: 
> Collector[CkResult]) = {
> try {
>   var clickNum: Long = clickCount.value
>   val dateNow = 
> LocalDateTime.now().format(DateTimeFormatter.ofPattern("MMdd")).toLong
>   var requestNum: Long = requestCount.value
>   var returnNum: Long = returnCount.value
>   var videoNum: Long = videoCount.value
>   var noVideoNum: Long = noVideoCount.value
>   if (requestNum == null) {
> requestNum = 0
>   }
>   
>   val ecpm = key._7.toDouble.formatted("%.2f").toFloat
>   val created_at = getSecondTimestampTwo(new Date)
>  
> *  elements.foreach(e => {
> if ("adreq".equals(e._3)) {
>   requestNum += 1
>   println(key._1, requestNum)
> }
>   })
>   requestCount.update(requestNum)
>   println(requestNum, key._1)*
>   
>   out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * 
> 15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, 
> key._6, key._1, requestCount.value, returnCount.value, fill_rate, 
> noVideoCount.value + videoCount.value,
> expose_rate, clickCount.value, click_rate, ecpm, 
> (noVideoCount.value * ecpm + videoCount.value * ecpm / 
> 1000.toFloat).formatted("%.2f").toFloat, created_at))
> }
> catch {
>   case e: Exception => println(key, e)
> }
>   }
> })
> {code}
> {code:java}
>   elements.foreach(e => {
> if ("adreq".equals(e._3)) {
>   requestNum += 1
>   println(key._1, requestNum)
> // The values printed here like :
> //(key,1)
> //(key,2)
> //(key,3)
> }
>   })
> //But print outside the for loop always like :
> //(key,0)
>   println(requestNum, key._1)
> {code}
> who can help me ,plz thx。



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18741) ProcessWindowFunction's process function exception

2020-07-28 Thread mzz (Jira)
mzz created FLINK-18741:
---

 Summary: ProcessWindowFunction's  process function exception
 Key: FLINK-18741
 URL: https://issues.apache.org/jira/browse/FLINK-18741
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.10.0
Reporter: mzz


I use ProcessWindowFunction to achieve PV calculation, but when rewriting 
process, the user-defined state value cannot be returned。
code:

{code:java}
tem.keyBy(x =>
  (x._1, x._2, x._4, x._5, x._6, x._7, x._8))
  .timeWindow(Time.seconds(15 * 60)) //15 min window
  .process(new ProcessWindowFunction[(String, String, String, String, 
String, String, String, String, String), CkResult, (String, String, String, 
String, String, String, String), TimeWindow] {
  var clickCount: ValueState[Long] = _
*  var requestCount: ValueState[Long] = _
*  var returnCount: ValueState[Long] = _
  var videoCount: ValueState[Long] = _
  var noVideoCount: ValueState[Long] = _

  override def open(parameters: Configuration): Unit = {
clickCount = getRuntimeContext.getState(new 
ValueStateDescriptor("clickCount", classOf[Long]))
   * requestCount = getRuntimeContext.getState(new 
ValueStateDescriptor("requestCount", classOf[Long]))*
returnCount = getRuntimeContext.getState(new 
ValueStateDescriptor("returnCount", classOf[Long]))
videoCount = getRuntimeContext.getState(new 
ValueStateDescriptor("videoCount", classOf[Long]))
noVideoCount = getRuntimeContext.getState(new 
ValueStateDescriptor("noVideoCount", classOf[Long]))
  }

  override def process(key: (String, String, String, String, String, 
String, String), context: Context, elements: Iterable[(String, String, String, 
String, String, String, String, String, String)], out: Collector[CkResult]) = {
try {
  var clickNum: Long = clickCount.value
  val dateNow = 
LocalDateTime.now().format(DateTimeFormatter.ofPattern("MMdd")).toLong
  var requestNum: Long = requestCount.value
  var returnNum: Long = returnCount.value
  var videoNum: Long = videoCount.value
  var noVideoNum: Long = noVideoCount.value
  if (requestNum == null) {
requestNum = 0
  }
  
  val ecpm = key._7.toDouble.formatted("%.2f").toFloat
  val created_at = getSecondTimestampTwo(new Date)
 
*  elements.foreach(e => {
if ("adreq".equals(e._3)) {
  requestNum += 1
  println(key._1, requestNum)
}
  })
  requestCount.update(requestNum)
  println(requestNum, key._1)*

  

  out.collect(CkResult(dateNow, (created_at - getZero_time) / (60 * 
15), key._2, key._3, key._4, key._5, key._3 + "_" + key._4 + "_" + key._5, 
key._6, key._1, requestCount.value, returnCount.value, fill_rate, 
noVideoCount.value + videoCount.value,
expose_rate, clickCount.value, click_rate, ecpm, 
(noVideoCount.value * ecpm + videoCount.value * ecpm / 
1000.toFloat).formatted("%.2f").toFloat, created_at))
}
catch {
  case e: Exception => println(key, e)
}
  }
})
{code}


{code:java}

  elements.foreach(e => {
if ("adreq".equals(e._3)) {
  requestNum += 1
  println(key._1, requestNum)
// The values printed here like :
//(key,1)
//(key,2)
//(key,3)
}
  })
//But print outside the for loop always like :
//(key,0)
  println(requestNum, key._1)
{code}


who can help me ,plz thx。









--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)

2020-07-21 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161870#comment-17161870
 ] 

mzz commented on FLINK-18652:
-

[~jark]
In the UI, there is no data in sink, but sometimes data can be inserted... 
Moreover, the checkpoint of sink has always been failed.

Running programs locally can always insert data.

> JDBCAppendTableSink  to  ClickHouse  (data  always  repeating)
> --
>
> Key: FLINK-18652
> URL: https://issues.apache.org/jira/browse/FLINK-18652
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
> Attachments: FLINK-UI.png, checkpoint-failed.png
>
>
> Hi all,
>data stream is : kafka->flinkSQL->clickhouse。
>The  window is 15 min,but,15 minutes after the first time, the data 
> kepping repeat sink to ClickHouse, plz  help me ,thx。
> {code:java}
> *// data source from kafka 
> * streamTableEnvironment.sqlUpdate(createTableSql)
> LOG.info("kafka source table has created !")
> val groupTable = streamTableEnvironment.sqlQuery(tempSql)
> streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
> *// this is window sql  ,use ProcessingTime
> *val re_table = streamTableEnvironment.sqlQuery(windowSql)
> re_table.printSchema()
> //groupTable.printSchema()
> val rr = streamTableEnvironment.toAppendStream[Result](re_table)
> * // The data here is printed normally
> *rr.print()
> streamTableEnvironment.createTemporaryView("result_table", rr)
> val s = streamTableEnvironment.sqlQuery(sql)
> *// sink to clickhouse*
> val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
>   .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
>   .setDBUrl(URL)
>   .setQuery(insertCKSql)
>   .setUsername(USERNAME)
>   .setPassword(PASSWORD)
>   .setBatchSize(1)
>   .setParameterTypes(
> Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, 
> Types.FLOAT,
> Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, 
> Types.FLOAT, Types.LONG()
>   )
>   .build()
> streamTableEnvironment.registerTableSink("ckResult", 
> Array[String]("data_date", "point", "platform", "page_name", 
> "component_name", "booth_name", "position1", "advertiser",
>   "adv_code", "request_num", "return_num", "fill_rate", "expose_num", 
> "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
>   Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, 
> Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, 
> Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
>   sink)
> // insert into TableSink
> s.insertInto("ckResult")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)

2020-07-21 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161867#comment-17161867
 ] 

mzz commented on FLINK-18652:
-

[~jark]
Yes, in the UI, there is no data in the sink display, but the number of data in 
the Clickhouse is increasing all the time, which is very strange.

After I submit the task again, the data will not be inserted into the 
Clickhouse after a window, but the same code can be inserted into the 
Clickhouse in the local idea.

> JDBCAppendTableSink  to  ClickHouse  (data  always  repeating)
> --
>
> Key: FLINK-18652
> URL: https://issues.apache.org/jira/browse/FLINK-18652
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
> Attachments: FLINK-UI.png, checkpoint-failed.png
>
>
> Hi all,
>data stream is : kafka->flinkSQL->clickhouse。
>The  window is 15 min,but,15 minutes after the first time, the data 
> kepping repeat sink to ClickHouse, plz  help me ,thx。
> {code:java}
> *// data source from kafka 
> * streamTableEnvironment.sqlUpdate(createTableSql)
> LOG.info("kafka source table has created !")
> val groupTable = streamTableEnvironment.sqlQuery(tempSql)
> streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
> *// this is window sql  ,use ProcessingTime
> *val re_table = streamTableEnvironment.sqlQuery(windowSql)
> re_table.printSchema()
> //groupTable.printSchema()
> val rr = streamTableEnvironment.toAppendStream[Result](re_table)
> * // The data here is printed normally
> *rr.print()
> streamTableEnvironment.createTemporaryView("result_table", rr)
> val s = streamTableEnvironment.sqlQuery(sql)
> *// sink to clickhouse*
> val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
>   .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
>   .setDBUrl(URL)
>   .setQuery(insertCKSql)
>   .setUsername(USERNAME)
>   .setPassword(PASSWORD)
>   .setBatchSize(1)
>   .setParameterTypes(
> Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, 
> Types.FLOAT,
> Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, 
> Types.FLOAT, Types.LONG()
>   )
>   .build()
> streamTableEnvironment.registerTableSink("ckResult", 
> Array[String]("data_date", "point", "platform", "page_name", 
> "component_name", "booth_name", "position1", "advertiser",
>   "adv_code", "request_num", "return_num", "fill_rate", "expose_num", 
> "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
>   Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, 
> Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, 
> Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
>   sink)
> // insert into TableSink
> s.insertInto("ckResult")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)

2020-07-21 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161751#comment-17161751
 ] 

mzz edited comment on FLINK-18652 at 7/21/20, 6:21 AM:
---

[~jark]
When I look at the UI, I find that the subtask of sink has no data, but other 
operators have data。and chekpoint always failed on  sink operators。
 !FLINK-UI.png! 


was (Author: mzz_q):
When I look at the UI, I find that the subtask of sink has no data, but other 
operators have data。and chekpoint always failed on  sink operators。
 !FLINK-UI.png! 

> JDBCAppendTableSink  to  ClickHouse  (data  always  repeating)
> --
>
> Key: FLINK-18652
> URL: https://issues.apache.org/jira/browse/FLINK-18652
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
> Attachments: FLINK-UI.png, checkpoint-failed.png
>
>
> Hi all,
>data stream is : kafka->flinkSQL->clickhouse。
>The  window is 15 min,but,15 minutes after the first time, the data 
> kepping repeat sink to ClickHouse, plz  help me ,thx。
> {code:java}
> *// data source from kafka 
> * streamTableEnvironment.sqlUpdate(createTableSql)
> LOG.info("kafka source table has created !")
> val groupTable = streamTableEnvironment.sqlQuery(tempSql)
> streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
> *// this is window sql  ,use ProcessingTime
> *val re_table = streamTableEnvironment.sqlQuery(windowSql)
> re_table.printSchema()
> //groupTable.printSchema()
> val rr = streamTableEnvironment.toAppendStream[Result](re_table)
> * // The data here is printed normally
> *rr.print()
> streamTableEnvironment.createTemporaryView("result_table", rr)
> val s = streamTableEnvironment.sqlQuery(sql)
> *// sink to clickhouse*
> val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
>   .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
>   .setDBUrl(URL)
>   .setQuery(insertCKSql)
>   .setUsername(USERNAME)
>   .setPassword(PASSWORD)
>   .setBatchSize(1)
>   .setParameterTypes(
> Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, 
> Types.FLOAT,
> Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, 
> Types.FLOAT, Types.LONG()
>   )
>   .build()
> streamTableEnvironment.registerTableSink("ckResult", 
> Array[String]("data_date", "point", "platform", "page_name", 
> "component_name", "booth_name", "position1", "advertiser",
>   "adv_code", "request_num", "return_num", "fill_rate", "expose_num", 
> "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
>   Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, 
> Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, 
> Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
>   sink)
> // insert into TableSink
> s.insertInto("ckResult")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)

2020-07-21 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161751#comment-17161751
 ] 

mzz commented on FLINK-18652:
-

When I look at the UI, I find that the subtask of sink has no data, but other 
operators have data。and chekpoint always failed on  sink operators。
 !FLINK-UI.png! 

> JDBCAppendTableSink  to  ClickHouse  (data  always  repeating)
> --
>
> Key: FLINK-18652
> URL: https://issues.apache.org/jira/browse/FLINK-18652
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
> Attachments: FLINK-UI.png, checkpoint-failed.png
>
>
> Hi all,
>data stream is : kafka->flinkSQL->clickhouse。
>The  window is 15 min,but,15 minutes after the first time, the data 
> kepping repeat sink to ClickHouse, plz  help me ,thx。
> {code:java}
> *// data source from kafka 
> * streamTableEnvironment.sqlUpdate(createTableSql)
> LOG.info("kafka source table has created !")
> val groupTable = streamTableEnvironment.sqlQuery(tempSql)
> streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
> *// this is window sql  ,use ProcessingTime
> *val re_table = streamTableEnvironment.sqlQuery(windowSql)
> re_table.printSchema()
> //groupTable.printSchema()
> val rr = streamTableEnvironment.toAppendStream[Result](re_table)
> * // The data here is printed normally
> *rr.print()
> streamTableEnvironment.createTemporaryView("result_table", rr)
> val s = streamTableEnvironment.sqlQuery(sql)
> *// sink to clickhouse*
> val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
>   .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
>   .setDBUrl(URL)
>   .setQuery(insertCKSql)
>   .setUsername(USERNAME)
>   .setPassword(PASSWORD)
>   .setBatchSize(1)
>   .setParameterTypes(
> Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, 
> Types.FLOAT,
> Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, 
> Types.FLOAT, Types.LONG()
>   )
>   .build()
> streamTableEnvironment.registerTableSink("ckResult", 
> Array[String]("data_date", "point", "platform", "page_name", 
> "component_name", "booth_name", "position1", "advertiser",
>   "adv_code", "request_num", "return_num", "fill_rate", "expose_num", 
> "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
>   Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, 
> Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, 
> Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
>   sink)
> // insert into TableSink
> s.insertInto("ckResult")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)

2020-07-21 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz updated FLINK-18652:

Attachment: FLINK-UI.png

> JDBCAppendTableSink  to  ClickHouse  (data  always  repeating)
> --
>
> Key: FLINK-18652
> URL: https://issues.apache.org/jira/browse/FLINK-18652
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
> Attachments: FLINK-UI.png, checkpoint-failed.png
>
>
> Hi all,
>data stream is : kafka->flinkSQL->clickhouse。
>The  window is 15 min,but,15 minutes after the first time, the data 
> kepping repeat sink to ClickHouse, plz  help me ,thx。
> {code:java}
> *// data source from kafka 
> * streamTableEnvironment.sqlUpdate(createTableSql)
> LOG.info("kafka source table has created !")
> val groupTable = streamTableEnvironment.sqlQuery(tempSql)
> streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
> *// this is window sql  ,use ProcessingTime
> *val re_table = streamTableEnvironment.sqlQuery(windowSql)
> re_table.printSchema()
> //groupTable.printSchema()
> val rr = streamTableEnvironment.toAppendStream[Result](re_table)
> * // The data here is printed normally
> *rr.print()
> streamTableEnvironment.createTemporaryView("result_table", rr)
> val s = streamTableEnvironment.sqlQuery(sql)
> *// sink to clickhouse*
> val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
>   .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
>   .setDBUrl(URL)
>   .setQuery(insertCKSql)
>   .setUsername(USERNAME)
>   .setPassword(PASSWORD)
>   .setBatchSize(1)
>   .setParameterTypes(
> Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, 
> Types.FLOAT,
> Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, 
> Types.FLOAT, Types.LONG()
>   )
>   .build()
> streamTableEnvironment.registerTableSink("ckResult", 
> Array[String]("data_date", "point", "platform", "page_name", 
> "component_name", "booth_name", "position1", "advertiser",
>   "adv_code", "request_num", "return_num", "fill_rate", "expose_num", 
> "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
>   Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, 
> Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, 
> Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
>   sink)
> // insert into TableSink
> s.insertInto("ckResult")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)

2020-07-21 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz updated FLINK-18652:

Attachment: checkpoint-failed.png

> JDBCAppendTableSink  to  ClickHouse  (data  always  repeating)
> --
>
> Key: FLINK-18652
> URL: https://issues.apache.org/jira/browse/FLINK-18652
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
> Attachments: checkpoint-failed.png
>
>
> Hi all,
>data stream is : kafka->flinkSQL->clickhouse。
>The  window is 15 min,but,15 minutes after the first time, the data 
> kepping repeat sink to ClickHouse, plz  help me ,thx。
> {code:java}
> *// data source from kafka 
> * streamTableEnvironment.sqlUpdate(createTableSql)
> LOG.info("kafka source table has created !")
> val groupTable = streamTableEnvironment.sqlQuery(tempSql)
> streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
> *// this is window sql  ,use ProcessingTime
> *val re_table = streamTableEnvironment.sqlQuery(windowSql)
> re_table.printSchema()
> //groupTable.printSchema()
> val rr = streamTableEnvironment.toAppendStream[Result](re_table)
> * // The data here is printed normally
> *rr.print()
> streamTableEnvironment.createTemporaryView("result_table", rr)
> val s = streamTableEnvironment.sqlQuery(sql)
> *// sink to clickhouse*
> val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
>   .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
>   .setDBUrl(URL)
>   .setQuery(insertCKSql)
>   .setUsername(USERNAME)
>   .setPassword(PASSWORD)
>   .setBatchSize(1)
>   .setParameterTypes(
> Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, 
> Types.FLOAT,
> Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, 
> Types.FLOAT, Types.LONG()
>   )
>   .build()
> streamTableEnvironment.registerTableSink("ckResult", 
> Array[String]("data_date", "point", "platform", "page_name", 
> "component_name", "booth_name", "position1", "advertiser",
>   "adv_code", "request_num", "return_num", "fill_rate", "expose_num", 
> "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
>   Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, 
> Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, 
> Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
>   sink)
> // insert into TableSink
> s.insertInto("ckResult")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)

2020-07-20 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161741#comment-17161741
 ] 

mzz commented on FLINK-18652:
-

[~jark]
*tempSql:*
{code:java}
val tempSql =
"""
  |select
  |ad_id,
  |projectname,
  |data_date,
  |event_type,
  |ts,
  |event_id,
  |event_page,
  |event_component,
  |event_booth,
  |ad_type,
  |ecpm
  |FROM aggs_test
  |where projectname is not null and projectname <> '' and projectname<> ' 
' and ad_id is not null and ad_id <> '' and ad_id<> ' '
""".stripMargin
{code}


> JDBCAppendTableSink  to  ClickHouse  (data  always  repeating)
> --
>
> Key: FLINK-18652
> URL: https://issues.apache.org/jira/browse/FLINK-18652
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> Hi all,
>data stream is : kafka->flinkSQL->clickhouse。
>The  window is 15 min,but,15 minutes after the first time, the data 
> kepping repeat sink to ClickHouse, plz  help me ,thx。
> {code:java}
> *// data source from kafka 
> * streamTableEnvironment.sqlUpdate(createTableSql)
> LOG.info("kafka source table has created !")
> val groupTable = streamTableEnvironment.sqlQuery(tempSql)
> streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
> *// this is window sql  ,use ProcessingTime
> *val re_table = streamTableEnvironment.sqlQuery(windowSql)
> re_table.printSchema()
> //groupTable.printSchema()
> val rr = streamTableEnvironment.toAppendStream[Result](re_table)
> * // The data here is printed normally
> *rr.print()
> streamTableEnvironment.createTemporaryView("result_table", rr)
> val s = streamTableEnvironment.sqlQuery(sql)
> *// sink to clickhouse*
> val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
>   .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
>   .setDBUrl(URL)
>   .setQuery(insertCKSql)
>   .setUsername(USERNAME)
>   .setPassword(PASSWORD)
>   .setBatchSize(1)
>   .setParameterTypes(
> Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, 
> Types.FLOAT,
> Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, 
> Types.FLOAT, Types.LONG()
>   )
>   .build()
> streamTableEnvironment.registerTableSink("ckResult", 
> Array[String]("data_date", "point", "platform", "page_name", 
> "component_name", "booth_name", "position1", "advertiser",
>   "adv_code", "request_num", "return_num", "fill_rate", "expose_num", 
> "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
>   Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, 
> Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, 
> Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, 
> Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
>   sink)
> // insert into TableSink
> s.insertInto("ckResult")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18443) The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters length l

2020-07-20 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz closed FLINK-18443.
---
Fix Version/s: 1.10.0
   Resolution: Fixed

> The operator name select: (ip, ts, count, environment.access AS access, 
> environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 
> 80 characters length limit and was truncated
> 
>
> Key: FLINK-18443
> URL: https://issues.apache.org/jira/browse/FLINK-18443
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.9.0
>Reporter: mzz
>Priority: Major
> Fix For: 1.10.0
>
>
> *Schema:*
> {code:java}
> .withSchema(new Schema()
>  .field("ip", Types.STRING())
>  .field("ts", Types.STRING())
>  .field("procTime", Types.SQL_TIMESTAMP).proctime()
>  .field("environment", schemaEnvironment)
>  .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, 
>   Types.ROW(Array("count","sid", "eventid","params"), 
>   Array[TypeInformation[_]](Types.STRING(),Types.STRING(), 
>   
> Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]]
>  (Types.STRING(),Types.STRING(),Types.STRING()))
> )
> {code}
> *when execute this sql*:
> {code:java}
> val sql =
>   """
> |SELECT
> |ip,
> |ts,
> |params.ad,
> |params.adtype,
> |eventid,
> |procTime
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params)
> |""".stripMargin
> {code}
> *I got a warning,and the console keeps brushing this warning,no normal 
> printout*
> {code:java}
> 09:38:38,694 WARN  org.apache.flink.metrics.MetricGroup   
>- The operator name correlate: table(explode($cor0.advs)), select: ip, ts, 
> procTime, advs, sid, eventid, params exceeded the 80 characters length limit 
> and was truncated.
> {code}
> *But after I change it to this way, although I occasionally brush this Warn, 
> it can be output normally。I change the 'params' type from Types.ROW to 
> Types.STRING*。
> {code:java}
>  .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, 
> Types.ROW(Array("count", "sid", "eventid", "params"),
>   Array[TypeInformation[_]](Types.STRING(), Types.STRING(), 
> Types.STRING(), Types.STRING()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18575) Failed to send data to Kafka

2020-07-20 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz closed FLINK-18575.
---
Release Note: kafka  has some error,its not about flink
  Resolution: Not A Problem

> Failed to send data to Kafka
> 
>
> Key: FLINK-18575
> URL: https://issues.apache.org/jira/browse/FLINK-18575
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Major
>
> Flink version: 1.10.0
> Kafka version: 2.2
> *code:*
> {code:java}
>  private def producerKafka(aggs_result: DataStream[String], topic: String, 
> parallelism: Int) = {
> val kafkaPro = new Properties()
> kafkaPro.setProperty("bootstrap.servers", SINK_BROKERS)
> kafkaPro.setProperty("zookeeper.connect", SINK_ZK)
> kafkaPro.setProperty("request.timeout.ms", "1")
> kafkaPro.setProperty("compression.type", "snappy")
> kafkaPro.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "")
> // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试:
> kafkaPro.setProperty(ProducerConfig.RETRIES_CONFIG, "5")
> val kafka = new FlinkKafkaProducer[String](topic, new 
> ResultDtSerialization(topic), kafkaPro, 
> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
> aggs_result.addSink(kafka).setParallelism(parallelism)
>   }
> {code}
> *when i use this code to produce  to kafka ,its report a Error :
> *{code:java}
> 2020-07-13 10:25:47,624 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error during 
> disposal of stream operator.
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to 
> send data to Kafka: Pending record count must be zero at this point: 1
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Pending record count must be zero 
> at this point: 1
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
>   ... 8 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18652) JDBCAppendTableSink to ClickHouse (data always repeating)

2020-07-20 Thread mzz (Jira)
mzz created FLINK-18652:
---

 Summary: JDBCAppendTableSink  to  ClickHouse  (data  always  
repeating)
 Key: FLINK-18652
 URL: https://issues.apache.org/jira/browse/FLINK-18652
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: mzz


Hi all,
   data stream is : kafka->flinkSQL->clickhouse。
   The  window is 15 min,but,15 minutes after the first time, the data kepping 
repeat sink to ClickHouse, plz  help me ,thx。
{code:java}
*// data source from kafka 
* streamTableEnvironment.sqlUpdate(createTableSql)
LOG.info("kafka source table has created !")
val groupTable = streamTableEnvironment.sqlQuery(tempSql)
streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
*// this is window sql  ,use ProcessingTime
*val re_table = streamTableEnvironment.sqlQuery(windowSql)
re_table.printSchema()
//groupTable.printSchema()
val rr = streamTableEnvironment.toAppendStream[Result](re_table)
* // The data here is printed normally
*rr.print()

streamTableEnvironment.createTemporaryView("result_table", rr)
val s = streamTableEnvironment.sqlQuery(sql)

*// sink to clickhouse*
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
  .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
  .setDBUrl(URL)
  .setQuery(insertCKSql)
  .setUsername(USERNAME)
  .setPassword(PASSWORD)
  .setBatchSize(1)
  .setParameterTypes(
Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING,
Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, 
Types.FLOAT,
Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, 
Types.FLOAT, Types.LONG()
  )
  .build()
streamTableEnvironment.registerTableSink("ckResult", 
Array[String]("data_date", "point", "platform", "page_name", "component_name", 
"booth_name", "position1", "advertiser",
  "adv_code", "request_num", "return_num", "fill_rate", "expose_num", 
"expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
  Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, 
Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, 
Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
  sink)


// insert into TableSink
s.insertInto("ckResult")
{code}






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18575) Failed to send data to Kafka

2020-07-20 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161658#comment-17161658
 ] 

mzz commented on FLINK-18575:
-

the problem has resolved,kafka has some error。It's not about Flink

> Failed to send data to Kafka
> 
>
> Key: FLINK-18575
> URL: https://issues.apache.org/jira/browse/FLINK-18575
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Major
>
> Flink version: 1.10.0
> Kafka version: 2.2
> *code:*
> {code:java}
>  private def producerKafka(aggs_result: DataStream[String], topic: String, 
> parallelism: Int) = {
> val kafkaPro = new Properties()
> kafkaPro.setProperty("bootstrap.servers", SINK_BROKERS)
> kafkaPro.setProperty("zookeeper.connect", SINK_ZK)
> kafkaPro.setProperty("request.timeout.ms", "1")
> kafkaPro.setProperty("compression.type", "snappy")
> kafkaPro.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "")
> // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试:
> kafkaPro.setProperty(ProducerConfig.RETRIES_CONFIG, "5")
> val kafka = new FlinkKafkaProducer[String](topic, new 
> ResultDtSerialization(topic), kafkaPro, 
> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
> aggs_result.addSink(kafka).setParallelism(parallelism)
>   }
> {code}
> *when i use this code to produce  to kafka ,its report a Error :
> *{code:java}
> 2020-07-13 10:25:47,624 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error during 
> disposal of stream operator.
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to 
> send data to Kafka: Pending record count must be zero at this point: 1
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Pending record count must be zero 
> at this point: 1
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
>   ... 8 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18575) Failed to send data to Kafka

2020-07-12 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156456#comment-17156456
 ] 

mzz commented on FLINK-18575:
-

supply:
the application was  running,and message was normally produce ,but log message 
always report this error.

> Failed to send data to Kafka
> 
>
> Key: FLINK-18575
> URL: https://issues.apache.org/jira/browse/FLINK-18575
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Major
>
> Flink version: 1.10.0
> Kafka version: 2.2
> *code:*
> {code:java}
>  private def producerKafka(aggs_result: DataStream[String], topic: String, 
> parallelism: Int) = {
> val kafkaPro = new Properties()
> kafkaPro.setProperty("bootstrap.servers", SINK_BROKERS)
> kafkaPro.setProperty("zookeeper.connect", SINK_ZK)
> kafkaPro.setProperty("request.timeout.ms", "1")
> kafkaPro.setProperty("compression.type", "snappy")
> kafkaPro.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "")
> // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试:
> kafkaPro.setProperty(ProducerConfig.RETRIES_CONFIG, "5")
> val kafka = new FlinkKafkaProducer[String](topic, new 
> ResultDtSerialization(topic), kafkaPro, 
> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
> aggs_result.addSink(kafka).setParallelism(parallelism)
>   }
> {code}
> *when i use this code to produce  to kafka ,its report a Error :
> *{code:java}
> 2020-07-13 10:25:47,624 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error during 
> disposal of stream operator.
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to 
> send data to Kafka: Pending record count must be zero at this point: 1
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Pending record count must be zero 
> at this point: 1
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
>   ... 8 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18575) Failed to send data to Kafka

2020-07-12 Thread mzz (Jira)
mzz created FLINK-18575:
---

 Summary: Failed to send data to Kafka
 Key: FLINK-18575
 URL: https://issues.apache.org/jira/browse/FLINK-18575
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.10.0
Reporter: mzz


Flink version: 1.10.0
Kafka version: 2.2

*code:*
{code:java}
 private def producerKafka(aggs_result: DataStream[String], topic: String, 
parallelism: Int) = {
val kafkaPro = new Properties()
kafkaPro.setProperty("bootstrap.servers", SINK_BROKERS)
kafkaPro.setProperty("zookeeper.connect", SINK_ZK)
kafkaPro.setProperty("request.timeout.ms", "1")
kafkaPro.setProperty("compression.type", "snappy")
kafkaPro.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "")
// 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试:
kafkaPro.setProperty(ProducerConfig.RETRIES_CONFIG, "5")
val kafka = new FlinkKafkaProducer[String](topic, new 
ResultDtSerialization(topic), kafkaPro, 
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
aggs_result.addSink(kafka).setParallelism(parallelism)
  }
{code}

*when i use this code to produce  to kafka ,its report a Error :
*{code:java}
2020-07-13 10:25:47,624 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask   - Error during 
disposal of stream operator.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Pending record count must be zero at this point: 1
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Pending record count must be zero 
at this point: 1
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
... 8 more
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)

2020-07-08 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153290#comment-17153290
 ] 

mzz edited comment on FLINK-18511 at 7/8/20, 10:16 AM:
---

[~jark]
its ok.
thx

{code:java}
 ts AS TO_TIMESTAMP(FROM_UNIXTIME(server_timestamp, '-MM-dd HH:mm:ss'))

{code}




was (Author: mzz_q):
[~jark]
Sry,can you show  me this function how use,this Excerpt to  flink website :
{code:java}
Converts date time string string1 with format string2 (by default: '-MM-dd 
HH:mm:ss') under the session time zone (specified by TableConfig) to a 
timestamp.
{code}

and  my server_timestamp like  as  '1593737902', THX


> Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
> --
>
> Key: FLINK-18511
> URL: https://issues.apache.org/jira/browse/FLINK-18511
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use Schema like this :
> {code:java}
> val schema = new Schema()
> .field("rowtimes", DataTypes.TIME(3)).rowtime(new 
> Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000))
> {code}
> sql:
> {code:java}
> val groupSql =
> """
>   |select access,
>   |count(access),
>   |TUMBLE_END(rowtimes,INTERVAL '10' second)
>   |from log_test_table
>   |group by
>   |access,TUMBLE_END(rowtimes,INTERVAL '10' second)
> """.stripMargin
> {code}
> report errors:
> {code:java}
> Exception in thread "main" 
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
> TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) 
> If you think this function should be supported, you can create an issue and 
> start a discussion for it.
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>   at 
> 

[jira] [Issue Comment Deleted] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)

2020-07-08 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz updated FLINK-18511:

Comment: was deleted

(was: [~jark]
my code is this :

{code:java}
app_time BIGINT, -- 13位的时间戳(1587975971)
   ts AS TO_TIMESTAMP(FROM_UNIXTIME(app_time , '-MM-dd HH:mm:ss')), -- 
定义事件时间
   WATERMARK FOR ts AS ts - INTERVAL '5' SECOND   -- 在ts上定义5 秒延迟的 watermark

{code}
it always reported error :

{code:java}
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
"count ," at line 1, column 562.
{code}

)

> Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
> --
>
> Key: FLINK-18511
> URL: https://issues.apache.org/jira/browse/FLINK-18511
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use Schema like this :
> {code:java}
> val schema = new Schema()
> .field("rowtimes", DataTypes.TIME(3)).rowtime(new 
> Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000))
> {code}
> sql:
> {code:java}
> val groupSql =
> """
>   |select access,
>   |count(access),
>   |TUMBLE_END(rowtimes,INTERVAL '10' second)
>   |from log_test_table
>   |group by
>   |access,TUMBLE_END(rowtimes,INTERVAL '10' second)
> """.stripMargin
> {code}
> report errors:
> {code:java}
> Exception in thread "main" 
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
> TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) 
> If you think this function should be supported, you can create an issue and 
> start a discussion for it.
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
> 

[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)

2020-07-08 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153458#comment-17153458
 ] 

mzz commented on FLINK-18511:
-

[~jark]
my code is this :

{code:java}
app_time BIGINT, -- 13位的时间戳(1587975971)
   ts AS TO_TIMESTAMP(FROM_UNIXTIME(app_time , '-MM-dd HH:mm:ss')), -- 
定义事件时间
   WATERMARK FOR ts AS ts - INTERVAL '5' SECOND   -- 在ts上定义5 秒延迟的 watermark

{code}
it always reported error :

{code:java}
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
"count ," at line 1, column 562.
{code}



> Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
> --
>
> Key: FLINK-18511
> URL: https://issues.apache.org/jira/browse/FLINK-18511
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use Schema like this :
> {code:java}
> val schema = new Schema()
> .field("rowtimes", DataTypes.TIME(3)).rowtime(new 
> Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000))
> {code}
> sql:
> {code:java}
> val groupSql =
> """
>   |select access,
>   |count(access),
>   |TUMBLE_END(rowtimes,INTERVAL '10' second)
>   |from log_test_table
>   |group by
>   |access,TUMBLE_END(rowtimes,INTERVAL '10' second)
> """.stripMargin
> {code}
> report errors:
> {code:java}
> Exception in thread "main" 
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
> TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) 
> If you think this function should be supported, you can create an issue and 
> start a discussion for it.
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>   at 
> 

[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)

2020-07-08 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153290#comment-17153290
 ] 

mzz commented on FLINK-18511:
-

[~jark]
Sry,can you show  me this function how use,this Excerpt to  flink website :
{code:java}
Converts date time string string1 with format string2 (by default: '-MM-dd 
HH:mm:ss') under the session time zone (specified by TableConfig) to a 
timestamp.
{code}

and  my server_timestamp like  as  '1593737902', THX


> Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
> --
>
> Key: FLINK-18511
> URL: https://issues.apache.org/jira/browse/FLINK-18511
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use Schema like this :
> {code:java}
> val schema = new Schema()
> .field("rowtimes", DataTypes.TIME(3)).rowtime(new 
> Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000))
> {code}
> sql:
> {code:java}
> val groupSql =
> """
>   |select access,
>   |count(access),
>   |TUMBLE_END(rowtimes,INTERVAL '10' second)
>   |from log_test_table
>   |group by
>   |access,TUMBLE_END(rowtimes,INTERVAL '10' second)
> """.stripMargin
> {code}
> report errors:
> {code:java}
> Exception in thread "main" 
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
> TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) 
> If you think this function should be supported, you can create an issue and 
> start a discussion for it.
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
>   at 
> 

[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)

2020-07-08 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153256#comment-17153256
 ] 

mzz commented on FLINK-18511:
-

[~jark]
I finf this error ,my `server_timestamp` like as 1593737902,cant use 
TIMESTAMP(3) directlly。So I coding like  this ,server_timestamp AS 
PROCTIME()。But it caused a error:

{code:java}
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
"count ," at line 1, column 562.
{code}
when I use server_timestamp VARCHAR, I cant use Watermark。
How can I change type VARCHAR to TIMESTAMP(3),THX。



> Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
> --
>
> Key: FLINK-18511
> URL: https://issues.apache.org/jira/browse/FLINK-18511
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use Schema like this :
> {code:java}
> val schema = new Schema()
> .field("rowtimes", DataTypes.TIME(3)).rowtime(new 
> Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000))
> {code}
> sql:
> {code:java}
> val groupSql =
> """
>   |select access,
>   |count(access),
>   |TUMBLE_END(rowtimes,INTERVAL '10' second)
>   |from log_test_table
>   |group by
>   |access,TUMBLE_END(rowtimes,INTERVAL '10' second)
> """.stripMargin
> {code}
> report errors:
> {code:java}
> Exception in thread "main" 
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
> TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) 
> If you think this function should be supported, you can create an issue and 
> start a discussion for it.
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>   at 
> 

[jira] [Comment Edited] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)

2020-07-08 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153256#comment-17153256
 ] 

mzz edited comment on FLINK-18511 at 7/8/20, 6:13 AM:
--

[~jark]
I find this error ,my `server_timestamp` like as 1593737902,cant use 
TIMESTAMP(3) directlly。So I coding like  this ,server_timestamp AS 
PROCTIME()。But it caused a error:

{code:java}
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
"count ," at line 1, column 562.
{code}
when I use server_timestamp VARCHAR, I cant use Watermark。
How can I change type VARCHAR to TIMESTAMP(3),THX。




was (Author: mzz_q):
[~jark]
I finf this error ,my `server_timestamp` like as 1593737902,cant use 
TIMESTAMP(3) directlly。So I coding like  this ,server_timestamp AS 
PROCTIME()。But it caused a error:

{code:java}
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
"count ," at line 1, column 562.
{code}
when I use server_timestamp VARCHAR, I cant use Watermark。
How can I change type VARCHAR to TIMESTAMP(3),THX。



> Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
> --
>
> Key: FLINK-18511
> URL: https://issues.apache.org/jira/browse/FLINK-18511
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use Schema like this :
> {code:java}
> val schema = new Schema()
> .field("rowtimes", DataTypes.TIME(3)).rowtime(new 
> Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000))
> {code}
> sql:
> {code:java}
> val groupSql =
> """
>   |select access,
>   |count(access),
>   |TUMBLE_END(rowtimes,INTERVAL '10' second)
>   |from log_test_table
>   |group by
>   |access,TUMBLE_END(rowtimes,INTERVAL '10' second)
> """.stripMargin
> {code}
> report errors:
> {code:java}
> Exception in thread "main" 
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
> TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) 
> If you think this function should be supported, you can create an issue and 
> start a discussion for it.
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>   at 
> 

[jira] [Commented] (FLINK-18443) The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters lengt

2020-07-07 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153207#comment-17153207
 ] 

mzz commented on FLINK-18443:
-

I replaced Types with DataTypes, the problem has solved

> The operator name select: (ip, ts, count, environment.access AS access, 
> environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 
> 80 characters length limit and was truncated
> 
>
> Key: FLINK-18443
> URL: https://issues.apache.org/jira/browse/FLINK-18443
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.9.0
>Reporter: mzz
>Priority: Major
>
> *Schema:*
> {code:java}
> .withSchema(new Schema()
>  .field("ip", Types.STRING())
>  .field("ts", Types.STRING())
>  .field("procTime", Types.SQL_TIMESTAMP).proctime()
>  .field("environment", schemaEnvironment)
>  .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, 
>   Types.ROW(Array("count","sid", "eventid","params"), 
>   Array[TypeInformation[_]](Types.STRING(),Types.STRING(), 
>   
> Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]]
>  (Types.STRING(),Types.STRING(),Types.STRING()))
> )
> {code}
> *when execute this sql*:
> {code:java}
> val sql =
>   """
> |SELECT
> |ip,
> |ts,
> |params.ad,
> |params.adtype,
> |eventid,
> |procTime
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params)
> |""".stripMargin
> {code}
> *I got a warning,and the console keeps brushing this warning,no normal 
> printout*
> {code:java}
> 09:38:38,694 WARN  org.apache.flink.metrics.MetricGroup   
>- The operator name correlate: table(explode($cor0.advs)), select: ip, ts, 
> procTime, advs, sid, eventid, params exceeded the 80 characters length limit 
> and was truncated.
> {code}
> *But after I change it to this way, although I occasionally brush this Warn, 
> it can be output normally。I change the 'params' type from Types.ROW to 
> Types.STRING*。
> {code:java}
>  .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, 
> Types.ROW(Array("count", "sid", "eventid", "params"),
>   Array[TypeInformation[_]](Types.STRING(), Types.STRING(), 
> Types.STRING(), Types.STRING()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)

2020-07-07 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153205#comment-17153205
 ] 

mzz commented on FLINK-18511:
-

THX [~jark].
I use DDL to register the table, createSQL:
{code:java}
val createTableSql =
"""
  |CREATE TABLE aggs_test(
  | data_date VARCHAR,
  | data_min VARCHAR,
  | data_hour VARCHAR,
  | server_timestamp TIMESTAMP(3),
  | access VARCHAR
  | WATERMARK FOR server_timestamp AS server_timestamp - INTERVAL '5' SECOND
  |)
  |WITH(
  | 'connector.type' ='kafka',
  | 'connector.version' = 'universal',
  | 'connector.topic' = 'xxx',
  | 'connector.startup-mode' = 'earliest-offset',
  | 'connector.properties.0.key' = 'bootstrap.servers',
  | 'connector.properties.0.value' = 'xxx',
  | 'update-mode' = 'append',
  |  'format.type' = 'json',
  |  'format.derive-schema' = 'true'
  |)
""".stripMargin
{code}

groupBySql:

{code:java}
val groupSql =
"""
  |select access,
  |TUMBLE_START(server_timestamp,INTERVAL '10' second),
  |count(ad_id)
  |from aggs_test
  |group by
  |TUMBLE(server_timestamp ,INTERVAL '10' second),access
""".stripMargin
{code}

error:

{code:java}
WARN  org.apache.flink.streaming.runtime.tasks.StreamTask   - Error 
while canceling task.
org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
at 
org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
at 
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:602)
at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1355)
at java.lang.Thread.run(Thread.java:748)
{code}




> Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
> --
>
> Key: FLINK-18511
> URL: https://issues.apache.org/jira/browse/FLINK-18511
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use Schema like this :
> {code:java}
> val schema = new Schema()
> .field("rowtimes", DataTypes.TIME(3)).rowtime(new 
> Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000))
> {code}
> sql:
> {code:java}
> val groupSql =
> """
>   |select access,
>   |count(access),
>   |TUMBLE_END(rowtimes,INTERVAL '10' second)
>   |from log_test_table
>   |group by
>   |access,TUMBLE_END(rowtimes,INTERVAL '10' second)
> """.stripMargin
> {code}
> report errors:
> {code:java}
> Exception in thread "main" 
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
> TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) 
> If you think this function should be supported, you can create an issue and 
> start a discussion for it.
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> 

[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)

2020-07-07 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152617#comment-17152617
 ] 

mzz commented on FLINK-18511:
-

[~jark]
my create table SQL:

{code:java}
val createTableSql =
"""
  |CREATE TABLE aggs_test(
  | data_date VARCHAR,
  | data_min VARCHAR,
  | data_hour VARCHAR,
  | server_timestamp BIGINT,
  | access VARCHAR,
  | appversion VARCHAR
  |)
  |WITH(
  | 'connector.type' ='kafka',
  | 'connector.version' = 'universal',
  | 'connector.topic' = 'xxx',
  | 'connector.startup-mode' = 'earliest-offset',
  | 'connector.properties.0.key' = 'bootstrap.servers',
  | 'connector.properties.0.value' = '',
  | 'update-mode' = 'append',
  |  'format.type' = 'json',
  |  'format.derive-schema' = 'true'
  |)
""".stripMargin
{code}


> Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
> --
>
> Key: FLINK-18511
> URL: https://issues.apache.org/jira/browse/FLINK-18511
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use Schema like this :
> {code:java}
> val schema = new Schema()
> .field("rowtimes", DataTypes.TIME(3)).rowtime(new 
> Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000))
> {code}
> sql:
> {code:java}
> val groupSql =
> """
>   |select access,
>   |count(access),
>   |TUMBLE_END(rowtimes,INTERVAL '10' second)
>   |from log_test_table
>   |group by
>   |access,TUMBLE_END(rowtimes,INTERVAL '10' second)
> """.stripMargin
> {code}
> report errors:
> {code:java}
> Exception in thread "main" 
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
> TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) 
> If you think this function should be supported, you can create an issue and 
> start a discussion for it.
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>   at 
> 

[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)

2020-07-07 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152615#comment-17152615
 ] 

mzz commented on FLINK-18511:
-

[~jark]
THX Jark Wu.
 I tried to  use DDL to register the table,but the error always exit.
My SQL is so simple:

{code:java}
val groupSql =
"""
  |select access,
  |count(access),
  |TUMBLE_END(CAST(server_timestamp as TIMESTAMP(3)),INTERVAL '10' second)
  |from aggs_test
  |group by
  |access,TUMBLE_END(CAST(server_timestamp as TIMESTAMP(3)),INTERVAL '10' 
second)
""".stripMargin

{code}

error:
{code:java}
Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: 
Unsupported call: TUMBLE_END 
If you think this function should be supported, you can create an issue and 
start a discussion for it.
{code}



> Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
> --
>
> Key: FLINK-18511
> URL: https://issues.apache.org/jira/browse/FLINK-18511
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use Schema like this :
> {code:java}
> val schema = new Schema()
> .field("rowtimes", DataTypes.TIME(3)).rowtime(new 
> Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000))
> {code}
> sql:
> {code:java}
> val groupSql =
> """
>   |select access,
>   |count(access),
>   |TUMBLE_END(rowtimes,INTERVAL '10' second)
>   |from log_test_table
>   |group by
>   |access,TUMBLE_END(rowtimes,INTERVAL '10' second)
> """.stripMargin
> {code}
> report errors:
> {code:java}
> Exception in thread "main" 
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
> TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) 
> If you think this function should be supported, you can create an issue and 
> start a discussion for it.
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>   at 
> 

[jira] [Commented] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)

2020-07-07 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152544#comment-17152544
 ] 

mzz commented on FLINK-18511:
-

[~jark]
thx for your reply,but  I change shema.filed as :

{code:java}
.field("rowtimes", DataTypes.TIMESTAMP(3)).rowtime(new 
Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000))

{code}

it always report error:

{code:java}
Exception in thread "main" 
org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
TUMBLE_END(TIMESTAMP(3), INTERVAL SECOND(3) NOT NULL) 
If you think this function should be supported, you can create an issue and 
start a discussion for it.
{code}



> Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)
> --
>
> Key: FLINK-18511
> URL: https://issues.apache.org/jira/browse/FLINK-18511
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: mzz
>Priority: Critical
>
> I use Schema like this :
> {code:java}
> val schema = new Schema()
> .field("rowtimes", DataTypes.TIME(3)).rowtime(new 
> Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000))
> {code}
> sql:
> {code:java}
> val groupSql =
> """
>   |select access,
>   |count(access),
>   |TUMBLE_END(rowtimes,INTERVAL '10' second)
>   |from log_test_table
>   |group by
>   |access,TUMBLE_END(rowtimes,INTERVAL '10' second)
> """.stripMargin
> {code}
> report errors:
> {code:java}
> Exception in thread "main" 
> org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
> TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) 
> If you think this function should be supported, you can create an issue and 
> start a discussion for it.
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
>   at 
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179)
>   at 
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>   at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>   at 
> 

[jira] [Created] (FLINK-18511) Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL)

2020-07-07 Thread mzz (Jira)
mzz created FLINK-18511:
---

 Summary: Unsupported call: TUMBLE_END(TIME(0), INTERVAL SECOND(3) 
NOT NULL)
 Key: FLINK-18511
 URL: https://issues.apache.org/jira/browse/FLINK-18511
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: mzz


I use Schema like this :
{code:java}
val schema = new Schema()
.field("rowtimes", DataTypes.TIME(3)).rowtime(new 
Rowtime().timestampsFromField("ts").watermarksPeriodicBounded(6000))

{code}

sql:

{code:java}
val groupSql =
"""
  |select access,
  |count(access),
  |TUMBLE_END(rowtimes,INTERVAL '10' second)
  |from log_test_table
  |group by
  |access,TUMBLE_END(rowtimes,INTERVAL '10' second)
""".stripMargin
{code}

report errors:



{code:java}
Exception in thread "main" 
org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: 
TUMBLE_END(TIME(0), INTERVAL SECOND(3) NOT NULL) 
If you think this function should be supported, you can create an issue and 
start a discussion for it.
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5$$anonfun$apply$2.apply(ExprCodeGenerator.scala:792)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:791)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$generateCallExpression$5.apply(ExprCodeGenerator.scala:796)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:785)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:485)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:152)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:152)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:179)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:49)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:77)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:139)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:55)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 

[jira] [Commented] (FLINK-18443) The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters lengt

2020-06-28 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147494#comment-17147494
 ] 

mzz commented on FLINK-18443:
-

I tried to read the source code,But it didn't help me。Because of the parameter 
METRICS_OPERATOR_NAME_MAX_LENGTH  is final。

{code:java}
static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) 
{
if (name != null && name.length() > 
METRICS_OPERATOR_NAME_MAX_LENGTH) {
LOG.warn("The operator name {} exceeded the {} 
characters length limit and was truncated.", name, 
METRICS_OPERATOR_NAME_MAX_LENGTH);
name = name.substring(0, 
METRICS_OPERATOR_NAME_MAX_LENGTH);
}
OperatorMetricGroup operator = new 
OperatorMetricGroup(this.registry, this, operatorID, name);
// unique OperatorIDs only exist in streaming, so we have to 
rely on the name for batch operators
final String key = operatorID + name;

synchronized (this) {
OperatorMetricGroup previous = operators.put(key, 
operator);
if (previous == null) {
// no operator group so far
return operator;
} else {
// already had an operator group. restore that 
one.
operators.put(key, previous);
return previous;
}
}
}

{code}



> The operator name select: (ip, ts, count, environment.access AS access, 
> environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 
> 80 characters length limit and was truncated
> 
>
> Key: FLINK-18443
> URL: https://issues.apache.org/jira/browse/FLINK-18443
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.9.0
>Reporter: mzz
>Priority: Major
>
> *Schema:*
> {code:java}
> .withSchema(new Schema()
>  .field("ip", Types.STRING())
>  .field("ts", Types.STRING())
>  .field("procTime", Types.SQL_TIMESTAMP).proctime()
>  .field("environment", schemaEnvironment)
>  .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, 
>   Types.ROW(Array("count","sid", "eventid","params"), 
>   Array[TypeInformation[_]](Types.STRING(),Types.STRING(), 
>   
> Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]]
>  (Types.STRING(),Types.STRING(),Types.STRING()))
> )
> {code}
> *when execute this sql*:
> {code:java}
> val sql =
>   """
> |SELECT
> |ip,
> |ts,
> |params.ad,
> |params.adtype,
> |eventid,
> |procTime
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params)
> |""".stripMargin
> {code}
> *I got a warning,and the console keeps brushing this warning,no normal 
> printout*
> {code:java}
> 09:38:38,694 WARN  org.apache.flink.metrics.MetricGroup   
>- The operator name correlate: table(explode($cor0.advs)), select: ip, ts, 
> procTime, advs, sid, eventid, params exceeded the 80 characters length limit 
> and was truncated.
> {code}
> *But after I change it to this way, although I occasionally brush this Warn, 
> it can be output normally。I change the 'params' type from Types.ROW to 
> Types.STRING*。
> {code:java}
>  .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, 
> Types.ROW(Array("count", "sid", "eventid", "params"),
>   Array[TypeInformation[_]](Types.STRING(), Types.STRING(), 
> Types.STRING(), Types.STRING()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18437) org.apache.calcite.sql.validate.SqlValidatorException: List of column aliases must have same degree as table

2020-06-28 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147492#comment-17147492
 ] 

mzz commented on FLINK-18437:
-

*I tried to read the source code,but it didn't help me。beacuse of the 
METRICS_OPERATOR_NAME_MAX_LENGTH is final*

{code:java}
static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;

public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) 
{
if (name != null && name.length() > 
METRICS_OPERATOR_NAME_MAX_LENGTH) {
LOG.warn("The operator name {} exceeded the {} 
characters length limit and was truncated.", name, 
METRICS_OPERATOR_NAME_MAX_LENGTH);
name = name.substring(0, 
METRICS_OPERATOR_NAME_MAX_LENGTH);
}
OperatorMetricGroup operator = new 
OperatorMetricGroup(this.registry, this, operatorID, name);
// unique OperatorIDs only exist in streaming, so we have to 
rely on the name for batch operators
final String key = operatorID + name;

synchronized (this) {
OperatorMetricGroup previous = operators.put(key, 
operator);
if (previous == null) {
// no operator group so far
return operator;
} else {
// already had an operator group. restore that 
one.
operators.put(key, previous);
return previous;
}
}
}
{code}


> org.apache.calcite.sql.validate.SqlValidatorException: List of column aliases 
> must have same degree as table
> 
>
> Key: FLINK-18437
> URL: https://issues.apache.org/jira/browse/FLINK-18437
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.9.3
>Reporter: mzz
>Priority: Critical
>
>  .withSchema(new Schema()
> .field("ip", Types.STRING())
> .field("ts", Types.STRING())
> .field("environment", Types.ROW(Array("access", "brand"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> The code above is dataSchema,i tried this way  
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
>   when i execute this sql: 
> val sql1 =
>   """
> |SELECT
> |ip,
> |ts,
> |environment,
> |adv
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (adv)
> |""".stripMargin
> It report an error:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 8, column 31 to line 8, column 33: List of 
> column aliases must have same degree as table; table has 2 columns ('access', 
> 'brand'), whereas alias list has 1 columns
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
>   at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
>   at 
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, 
> column 31 to line 8, column 33: List of column aliases must have same degree 
> as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 
> columns
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>   at 
> 

[jira] [Created] (FLINK-18443) The operator name select: (ip, ts, count, environment.access AS access, environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 characters length

2020-06-28 Thread mzz (Jira)
mzz created FLINK-18443:
---

 Summary: The operator name select: (ip, ts, count, 
environment.access AS access, environment.brand AS brand, sid, params.adid AS 
adid, eventid) exceeded the 80 characters length limit and was truncated
 Key: FLINK-18443
 URL: https://issues.apache.org/jira/browse/FLINK-18443
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.9.0
Reporter: mzz


*Schema:*

{code:java}
.withSchema(new Schema()
 .field("ip", Types.STRING())
 .field("ts", Types.STRING())
 .field("procTime", Types.SQL_TIMESTAMP).proctime()
 .field("environment", schemaEnvironment)
 .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, 
  Types.ROW(Array("count","sid", "eventid","params"), 
  Array[TypeInformation[_]](Types.STRING(),Types.STRING(), 
  
Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]]
 (Types.STRING(),Types.STRING(),Types.STRING()))
)
{code}

*when execute this sql*:

{code:java}
val sql =
  """
|SELECT
|ip,
|ts,
|params.ad,
|params.adtype,
|eventid,
|procTime
|FROM aggs_test
|CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params)
|""".stripMargin
{code}

*I got a warning,and the console keeps brushing this warning,no normal printout*

{code:java}
09:38:38,694 WARN  org.apache.flink.metrics.MetricGroup 
 - The operator name correlate: table(explode($cor0.advs)), select: ip, ts, 
procTime, advs, sid, eventid, params exceeded the 80 characters length limit 
and was truncated.

{code}

*But after I change it to this way, although I occasionally brush this Warn, it 
can be output normally。I change the 'params' type from Types.ROW to 
Types.STRING*。

{code:java}
 .field("advs", ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass, 
Types.ROW(Array("count", "sid", "eventid", "params"),
  Array[TypeInformation[_]](Types.STRING(), Types.STRING(), 
Types.STRING(), Types.STRING()
{code}













--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18437) org.apache.calcite.sql.validate.SqlValidatorException: List of column aliases must have same degree as table

2020-06-28 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz updated FLINK-18437:

Summary: org.apache.calcite.sql.validate.SqlValidatorException: List of 
column aliases must have same degree as table  (was: Error message is not 
correct when using UNNEST)

> org.apache.calcite.sql.validate.SqlValidatorException: List of column aliases 
> must have same degree as table
> 
>
> Key: FLINK-18437
> URL: https://issues.apache.org/jira/browse/FLINK-18437
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.9.3
>Reporter: mzz
>Priority: Critical
>
>  .withSchema(new Schema()
> .field("ip", Types.STRING())
> .field("ts", Types.STRING())
> .field("environment", Types.ROW(Array("access", "brand"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> The code above is dataSchema,i tried this way  
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
>   when i execute this sql: 
> val sql1 =
>   """
> |SELECT
> |ip,
> |ts,
> |environment,
> |adv
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (adv)
> |""".stripMargin
> It report an error:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 8, column 31 to line 8, column 33: List of 
> column aliases must have same degree as table; table has 2 columns ('access', 
> 'brand'), whereas alias list has 1 columns
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
>   at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
>   at 
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, 
> column 31 to line 8, column 33: List of column aliases must have same degree 
> as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 
> columns
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
>   at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191)
>   at 
> org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684)
>   at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291)
>   at 
> 

[jira] [Comment Edited] (FLINK-18437) Error message is not correct when using UNNEST

2020-06-28 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147193#comment-17147193
 ] 

mzz edited comment on FLINK-18437 at 6/28/20, 9:16 AM:
---

*Schema:*
.withSchema(new Schema()
.field("ip", Types.STRING())
.field("ts", Types.STRING())
.field("environment", schemaEnvironment)
.field("advs", ObjectArrayTypeInfo.getInfoFor(new 
Array[Row](0).getClass, Types.ROW(Array("count","sid", "eventid","params"),
  Array[TypeInformation[_]](Types.STRING(),Types.STRING(), 
Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]](Types.STRING(),Types.STRING(),Types.STRING()))
  )

when execute this sql:
{code:java}
val sql =
  """
|SELECT
|ip,
|ts,
|`count`,
|environment.access,
|environment.brand,
|sid,
|params.adid,
|eventid
|FROM aggs_test
|CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params)
|""".stripMargin
{code}

*log:*

11:31:54,379 WARN  org.apache.flink.metrics.MetricGroup 
 - The operator name select: (ip, ts, count, environment.access AS access, 
environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 
characters length limit and was truncated.


*The console keeps brushing this  warning,and caused  fail to output 
normally,Which parameter should I modify。THX。*





was (Author: mzz_q):
*Schema:*
.withSchema(new Schema()
.field("ip", Types.STRING())
.field("ts", Types.STRING())
.field("environment", schemaEnvironment)
.field("advs", ObjectArrayTypeInfo.getInfoFor(new 
Array[Row](0).getClass, Types.ROW(Array("count","sid", "eventid","params"),
  Array[TypeInformation[_]](Types.STRING(),Types.STRING(), 
Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]](Types.STRING(),Types.STRING(),Types.STRING()))
  )

when execute this sql:
{code:java}
val sql =
  """
|SELECT
|ip,
|ts,
|`count`,
|environment.access,
|environment.brand,
|sid,
|params.adid,
|eventid
|FROM aggs_test
|CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params)
|""".stripMargin
{code}

*log:*

11:31:54,379 WARN  org.apache.flink.metrics.MetricGroup 
 - The operator name select: (ip, ts, count, environment.access AS access, 
environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 
characters length limit and was truncated.


*This warning caused  fail to output normally,Which parameter should I 
modify。THX。*




> Error message is not correct when using UNNEST
> --
>
> Key: FLINK-18437
> URL: https://issues.apache.org/jira/browse/FLINK-18437
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.9.3
>Reporter: mzz
>Priority: Critical
>
>  .withSchema(new Schema()
> .field("ip", Types.STRING())
> .field("ts", Types.STRING())
> .field("environment", Types.ROW(Array("access", "brand"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> The code above is dataSchema,i tried this way  
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
>   when i execute this sql: 
> val sql1 =
>   """
> |SELECT
> |ip,
> |ts,
> |environment,
> |adv
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (adv)
> |""".stripMargin
> It report an error:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 8, column 31 to line 8, column 33: List of 
> column aliases must have same degree as table; table has 2 columns ('access', 
> 'brand'), whereas alias list has 1 columns
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
>   at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
>   at 
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> 

[jira] [Updated] (FLINK-18437) Error message is not correct when using UNNEST

2020-06-28 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz updated FLINK-18437:

Issue Type: Bug  (was: Improvement)

> Error message is not correct when using UNNEST
> --
>
> Key: FLINK-18437
> URL: https://issues.apache.org/jira/browse/FLINK-18437
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.9.3
>Reporter: mzz
>Priority: Critical
>
>  .withSchema(new Schema()
> .field("ip", Types.STRING())
> .field("ts", Types.STRING())
> .field("environment", Types.ROW(Array("access", "brand"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> The code above is dataSchema,i tried this way  
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
>   when i execute this sql: 
> val sql1 =
>   """
> |SELECT
> |ip,
> |ts,
> |environment,
> |adv
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (adv)
> |""".stripMargin
> It report an error:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 8, column 31 to line 8, column 33: List of 
> column aliases must have same degree as table; table has 2 columns ('access', 
> 'brand'), whereas alias list has 1 columns
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
>   at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
>   at 
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, 
> column 31 to line 8, column 33: List of column aliases must have same degree 
> as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 
> columns
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
>   at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191)
>   at 
> org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684)
>   at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108)
>   at 
> 

[jira] [Comment Edited] (FLINK-18437) Error message is not correct when using UNNEST

2020-06-28 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147193#comment-17147193
 ] 

mzz edited comment on FLINK-18437 at 6/28/20, 9:11 AM:
---

*Schema:*
.withSchema(new Schema()
.field("ip", Types.STRING())
.field("ts", Types.STRING())
.field("environment", schemaEnvironment)
.field("advs", ObjectArrayTypeInfo.getInfoFor(new 
Array[Row](0).getClass, Types.ROW(Array("count","sid", "eventid","params"),
  Array[TypeInformation[_]](Types.STRING(),Types.STRING(), 
Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]](Types.STRING(),Types.STRING(),Types.STRING()))
  )

when execute this sql:
{code:java}
val sql =
  """
|SELECT
|ip,
|ts,
|`count`,
|environment.access,
|environment.brand,
|sid,
|params.adid,
|eventid
|FROM aggs_test
|CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params)
|""".stripMargin
{code}

*log:*

11:31:54,379 WARN  org.apache.flink.metrics.MetricGroup 
 - The operator name select: (ip, ts, count, environment.access AS access, 
environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 
characters length limit and was truncated.


*This warning caused  fail to output normally,Which parameter should I 
modify。THX。*





was (Author: mzz_q):
*Schema:*
.withSchema(new Schema()
.field("ip", Types.STRING())
.field("ts", Types.STRING())
.field("environment", schemaEnvironment)
.field("advs", ObjectArrayTypeInfo.getInfoFor(new 
Array[Row](0).getClass, Types.ROW(Array("count","sid", "eventid","params"),
  Array[TypeInformation[_]](Types.STRING(),Types.STRING(), 
Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]](Types.STRING(),Types.STRING(),Types.STRING()))
  )

when execute this sql:
{code:java}
val sql =
  """
|SELECT
|ip,
|ts,
|`count`,
|environment.access,
|environment.brand,
|sid,
|params.adid,
|eventid
|FROM aggs_test
|CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params)
|""".stripMargin
{code}

*log:*

11:31:54,379 WARN  org.apache.flink.metrics.MetricGroup 
 - The operator name select: (ip, ts, count, environment.access AS access, 
environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 
characters length limit and was truncated.


*Which parameter should I modify。THX。
*




> Error message is not correct when using UNNEST
> --
>
> Key: FLINK-18437
> URL: https://issues.apache.org/jira/browse/FLINK-18437
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.9.3
>Reporter: mzz
>Priority: Critical
>
>  .withSchema(new Schema()
> .field("ip", Types.STRING())
> .field("ts", Types.STRING())
> .field("environment", Types.ROW(Array("access", "brand"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> The code above is dataSchema,i tried this way  
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
>   when i execute this sql: 
> val sql1 =
>   """
> |SELECT
> |ip,
> |ts,
> |environment,
> |adv
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (adv)
> |""".stripMargin
> It report an error:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 8, column 31 to line 8, column 33: List of 
> column aliases must have same degree as table; table has 2 columns ('access', 
> 'brand'), whereas alias list has 1 columns
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
>   at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
>   at 
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, 

[jira] [Updated] (FLINK-18437) Error message is not correct when using UNNEST

2020-06-28 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz updated FLINK-18437:

Priority: Critical  (was: Major)

> Error message is not correct when using UNNEST
> --
>
> Key: FLINK-18437
> URL: https://issues.apache.org/jira/browse/FLINK-18437
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.9.3
>Reporter: mzz
>Priority: Critical
>
>  .withSchema(new Schema()
> .field("ip", Types.STRING())
> .field("ts", Types.STRING())
> .field("environment", Types.ROW(Array("access", "brand"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> The code above is dataSchema,i tried this way  
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
>   when i execute this sql: 
> val sql1 =
>   """
> |SELECT
> |ip,
> |ts,
> |environment,
> |adv
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (adv)
> |""".stripMargin
> It report an error:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 8, column 31 to line 8, column 33: List of 
> column aliases must have same degree as table; table has 2 columns ('access', 
> 'brand'), whereas alias list has 1 columns
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
>   at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
>   at 
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, 
> column 31 to line 8, column 33: List of column aliases must have same degree 
> as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 
> columns
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
>   at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191)
>   at 
> org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684)
>   at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108)
>   at 
> 

[jira] [Commented] (FLINK-18437) Error message is not correct when using UNNEST

2020-06-27 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147193#comment-17147193
 ] 

mzz commented on FLINK-18437:
-

*Schema:*
.withSchema(new Schema()
.field("ip", Types.STRING())
.field("ts", Types.STRING())
.field("environment", schemaEnvironment)
.field("advs", ObjectArrayTypeInfo.getInfoFor(new 
Array[Row](0).getClass, Types.ROW(Array("count","sid", "eventid","params"),
  Array[TypeInformation[_]](Types.STRING(),Types.STRING(), 
Types.STRING(),Types.ROW(Array("adid","adtype","ecpm"),Array[TypeInformation[_]](Types.STRING(),Types.STRING(),Types.STRING()))
  )

when execute this sql:
{code:java}
val sql =
  """
|SELECT
|ip,
|ts,
|`count`,
|environment.access,
|environment.brand,
|sid,
|params.adid,
|eventid
|FROM aggs_test
|CROSS JOIN UNNEST(advs) AS t (`count`,sid, eventid,params)
|""".stripMargin
{code}

*log:*

11:31:54,379 WARN  org.apache.flink.metrics.MetricGroup 
 - The operator name select: (ip, ts, count, environment.access AS access, 
environment.brand AS brand, sid, params.adid AS adid, eventid) exceeded the 80 
characters length limit and was truncated.


*Which parameter should I modify。THX。
*




> Error message is not correct when using UNNEST
> --
>
> Key: FLINK-18437
> URL: https://issues.apache.org/jira/browse/FLINK-18437
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.9.3
>Reporter: mzz
>Priority: Major
>
>  .withSchema(new Schema()
> .field("ip", Types.STRING())
> .field("ts", Types.STRING())
> .field("environment", Types.ROW(Array("access", "brand"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> The code above is dataSchema,i tried this way  
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
>   when i execute this sql: 
> val sql1 =
>   """
> |SELECT
> |ip,
> |ts,
> |environment,
> |adv
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (adv)
> |""".stripMargin
> It report an error:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 8, column 31 to line 8, column 33: List of 
> column aliases must have same degree as table; table has 2 columns ('access', 
> 'brand'), whereas alias list has 1 columns
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
>   at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
>   at 
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, 
> column 31 to line 8, column 33: List of column aliases must have same degree 
> as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 
> columns
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> 

[jira] [Updated] (FLINK-18437) Error message is not correct when using UNNEST

2020-06-27 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz updated FLINK-18437:

Priority: Major  (was: Minor)

> Error message is not correct when using UNNEST
> --
>
> Key: FLINK-18437
> URL: https://issues.apache.org/jira/browse/FLINK-18437
> Project: Flink
>  Issue Type: Test
>  Components: API / DataStream
>Affects Versions: 1.9.3
>Reporter: mzz
>Priority: Major
>
>  .withSchema(new Schema()
> .field("ip", Types.STRING())
> .field("ts", Types.STRING())
> .field("environment", Types.ROW(Array("access", "brand"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> The code above is dataSchema,i tried this way  
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
>   when i execute this sql: 
> val sql1 =
>   """
> |SELECT
> |ip,
> |ts,
> |environment,
> |adv
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (adv)
> |""".stripMargin
> It report an error:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 8, column 31 to line 8, column 33: List of 
> column aliases must have same degree as table; table has 2 columns ('access', 
> 'brand'), whereas alias list has 1 columns
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
>   at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
>   at 
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, 
> column 31 to line 8, column 33: List of column aliases must have same degree 
> as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 
> columns
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
>   at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191)
>   at 
> org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684)
>   at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3093)
> 

[jira] [Updated] (FLINK-18437) Error message is not correct when using UNNEST

2020-06-27 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz updated FLINK-18437:

Issue Type: Improvement  (was: Test)

> Error message is not correct when using UNNEST
> --
>
> Key: FLINK-18437
> URL: https://issues.apache.org/jira/browse/FLINK-18437
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.9.3
>Reporter: mzz
>Priority: Major
>
>  .withSchema(new Schema()
> .field("ip", Types.STRING())
> .field("ts", Types.STRING())
> .field("environment", Types.ROW(Array("access", "brand"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> The code above is dataSchema,i tried this way  
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
>   when i execute this sql: 
> val sql1 =
>   """
> |SELECT
> |ip,
> |ts,
> |environment,
> |adv
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (adv)
> |""".stripMargin
> It report an error:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 8, column 31 to line 8, column 33: List of 
> column aliases must have same degree as table; table has 2 columns ('access', 
> 'brand'), whereas alias list has 1 columns
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
>   at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
>   at 
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, 
> column 31 to line 8, column 33: List of column aliases must have same degree 
> as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 
> columns
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
>   at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191)
>   at 
> org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684)
>   at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108)
>   at 
> 

[jira] [Updated] (FLINK-18437) Table API has no Functions like sparkSQL explode

2020-06-27 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz updated FLINK-18437:

Issue Type: Test  (was: Bug)

> Table API has no Functions like  sparkSQL explode
> -
>
> Key: FLINK-18437
> URL: https://issues.apache.org/jira/browse/FLINK-18437
> Project: Flink
>  Issue Type: Test
>  Components: API / DataStream
>Affects Versions: 1.9.3
>Reporter: mzz
>Priority: Minor
>
>  .withSchema(new Schema()
> .field("ip", Types.STRING())
> .field("ts", Types.STRING())
> .field("environment", Types.ROW(Array("access", "brand"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> The code above is dataSchema,i tried this way  
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
>   when i execute this sql: 
> val sql1 =
>   """
> |SELECT
> |ip,
> |ts,
> |environment,
> |adv
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (adv)
> |""".stripMargin
> It report an error:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 8, column 31 to line 8, column 33: List of 
> column aliases must have same degree as table; table has 2 columns ('access', 
> 'brand'), whereas alias list has 1 columns
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
>   at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
>   at 
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, 
> column 31 to line 8, column 33: List of column aliases must have same degree 
> as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 
> columns
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
>   at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191)
>   at 
> org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684)
>   at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108)
>   at 
> 

[jira] [Commented] (FLINK-18437) Table API has no Functions like sparkSQL explode

2020-06-27 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147162#comment-17147162
 ] 

mzz commented on FLINK-18437:
-

[~jark]
Thank you for your advice,Problem has  solved。
This error log made me to go the wrong direction。`environment`('access', 
'brand') is a json object,`advs `(["count","eventid"]) is a json array,error 
log is  "table has 2 columns ('access', 'brand'), whereas alias list has 1 
columns"。





> Table API has no Functions like  sparkSQL explode
> -
>
> Key: FLINK-18437
> URL: https://issues.apache.org/jira/browse/FLINK-18437
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.9.3
>Reporter: mzz
>Priority: Major
>
>  .withSchema(new Schema()
> .field("ip", Types.STRING())
> .field("ts", Types.STRING())
> .field("environment", Types.ROW(Array("access", "brand"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> The code above is dataSchema,i tried this way  
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
>   when i execute this sql: 
> val sql1 =
>   """
> |SELECT
> |ip,
> |ts,
> |environment,
> |adv
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (adv)
> |""".stripMargin
> It report an error:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 8, column 31 to line 8, column 33: List of 
> column aliases must have same degree as table; table has 2 columns ('access', 
> 'brand'), whereas alias list has 1 columns
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
>   at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
>   at 
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, 
> column 31 to line 8, column 33: List of column aliases must have same degree 
> as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 
> columns
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
>   at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191)
>   at 
> org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684)
>   at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291)
>  

[jira] [Updated] (FLINK-18437) Table API has no Functions like sparkSQL explode

2020-06-27 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz updated FLINK-18437:

Priority: Minor  (was: Major)

> Table API has no Functions like  sparkSQL explode
> -
>
> Key: FLINK-18437
> URL: https://issues.apache.org/jira/browse/FLINK-18437
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.9.3
>Reporter: mzz
>Priority: Minor
>
>  .withSchema(new Schema()
> .field("ip", Types.STRING())
> .field("ts", Types.STRING())
> .field("environment", Types.ROW(Array("access", "brand"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> The code above is dataSchema,i tried this way  
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
>   when i execute this sql: 
> val sql1 =
>   """
> |SELECT
> |ip,
> |ts,
> |environment,
> |adv
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (adv)
> |""".stripMargin
> It report an error:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 8, column 31 to line 8, column 33: List of 
> column aliases must have same degree as table; table has 2 columns ('access', 
> 'brand'), whereas alias list has 1 columns
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
>   at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
>   at 
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, 
> column 31 to line 8, column 33: List of column aliases must have same degree 
> as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 
> columns
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
>   at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191)
>   at 
> org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684)
>   at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108)
>   at 
> 

[jira] [Commented] (FLINK-18437) Table API has no Functions like sparkSQL explode

2020-06-27 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147148#comment-17147148
 ] 

mzz commented on FLINK-18437:
-

execute :select * from table,and print schema:
root
 |-- ip: STRING
 |-- ts: STRING
 |-- environment: ROW<`access` STRING, `brand` STRING>
 |-- advs: ARRAY>

I want to split the array into multiple rows,thx

> Table API has no Functions like  sparkSQL explode
> -
>
> Key: FLINK-18437
> URL: https://issues.apache.org/jira/browse/FLINK-18437
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.9.3
>Reporter: mzz
>Priority: Major
>
>  .withSchema(new Schema()
> .field("ip", Types.STRING())
> .field("ts", Types.STRING())
> .field("environment", Types.ROW(Array("access", "brand"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> The code above is dataSchema,i tried this way  
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
>   when i execute this sql: 
> val sql1 =
>   """
> |SELECT
> |ip,
> |ts,
> |environment,
> |adv
> |FROM aggs_test
> |CROSS JOIN UNNEST(advs) AS t (adv)
> |""".stripMargin
> It report an error:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 8, column 31 to line 8, column 33: List of 
> column aliases must have same degree as table; table has 2 columns ('access', 
> 'brand'), whereas alias list has 1 columns
>   at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
>   at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
>   at 
> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
>   at 
> QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, 
> column 31 to line 8, column 33: List of column aliases must have same degree 
> as table; table has 2 columns ('access', 'brand'), whereas alias list has 1 
> columns
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
>   at 
> org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
>   at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191)
>   at 
> org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684)
>   at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291)
>   at 
> 

[jira] [Created] (FLINK-18437) Table API has no Functions like sparkSQL explode

2020-06-27 Thread mzz (Jira)
mzz created FLINK-18437:
---

 Summary: Table API has no Functions like  sparkSQL explode
 Key: FLINK-18437
 URL: https://issues.apache.org/jira/browse/FLINK-18437
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.9.3
Reporter: mzz


 .withSchema(new Schema()
.field("ip", Types.STRING())
.field("ts", Types.STRING())
.field("environment", Types.ROW(Array("access", "brand"), 
Array[TypeInformation[_]](Types.STRING(), Types.STRING)))
.field("advs", ObjectArrayTypeInfo.getInfoFor(new 
Array[Row](0).getClass, Types.ROW(Array("count", "eventid"), 
Array[TypeInformation[_]](Types.STRING(), Types.STRING
  )
  .inAppendMode()
  .registerTableSource("aggs_test")



The code above is dataSchema,i tried this way  
[https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html|http://example.com],but
  when i execute this sql: 
val sql1 =
  """
|SELECT
|ip,
|ts,
|environment,
|adv
|FROM aggs_test
|CROSS JOIN UNNEST(advs) AS t (adv)
|""".stripMargin

It report an error:
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. From line 8, column 31 to line 8, column 33: List of column 
aliases must have same degree as table; table has 2 columns ('access', 
'brand'), whereas alias list has 1 columns
at 
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:128)
at 
org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:83)
at 
org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:115)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:298)
at 
QM.COM.Flink.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:88)
at 
QM.COM.Flink.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 8, 
column 31 to line 8, column 33: List of column aliases must have same degree as 
table; table has 2 columns ('access', 'brand'), whereas alias list has 1 columns
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4807)
at 
org.apache.calcite.sql.validate.AliasNamespace.validateImpl(AliasNamespace.java:86)
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
at 
org.apache.calcite.sql.validate.AbstractNamespace.getRowType(AbstractNamespace.java:115)
at 
org.apache.calcite.sql.validate.AliasNamespace.getRowType(AliasNamespace.java:41)
at 
org.apache.calcite.sql.validate.DelegatingScope.resolveInNamespace(DelegatingScope.java:101)
at org.apache.calcite.sql.validate.ListScope.resolve(ListScope.java:191)
at 
org.apache.calcite.sql.validate.ListScope.findQualifyingTableNames(ListScope.java:156)
at 
org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:238)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5699)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5684)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5291)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3126)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3108)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3093)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3158)
at 
org.apache.flink.table.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.scala:67)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3102)
at 

[jira] [Commented] (FLINK-18230) Table API has no Functions like sparkSQL explode

2020-06-09 Thread mzz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17130097#comment-17130097
 ] 

mzz commented on FLINK-18230:
-

As for my SQL, my idea is to flattening nest Array like sparksql.explode 
function
SparkSQL.eg:
"select ip, ts,launchs,adv.`count` from aggs_test LATERAL VIEW explode(advs) a 
AS adv"

> Table API has no Functions like  sparkSQL explode
> -
>
> Key: FLINK-18230
> URL: https://issues.apache.org/jira/browse/FLINK-18230
> Project: Flink
>  Issue Type: Improvement
>Reporter: mzz
>Priority: Major
>
> streamTableEnvironment.connect(new Kafka()
>   .topic(TOPIC)
>   .version(VERSION)
>   .startFromEarliest()
>   .property("bootstrap.servers", "172.16.30.207:9092")
>   .property("group.id", "km_aggs_group_3")
> )
>   .withFormat(
> new Json()
>   .failOnMissingField(true)
>   .deriveSchema()
>   )
>   .withSchema(new Schema()
> .field("devs", Types.STRING())
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass,
> Types.ROW(Array("count", "eventid", "sid", "params"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING, Types.STRING,
>   Types.ROW(Array("adid", "adtype", "ecpm"), 
> Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING
>   ))
> .field("identity", Types.STRING())
> .field("ip", Types.STRING())
> .field("launchs", Types.STRING())
> .field("ts", Types.STRING())
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> val tableResult = streamTableEnvironment.sqlQuery("select ip, 
> ts,launchs,advs[1].`count` from aggs_test")
> tableResult.printSchema()
> streamTableEnvironment.toAppendStream[Row](tableResult).print()



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18230) Table API has no Functions like sparkSQL explode

2020-06-09 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz updated FLINK-18230:

Issue Type: Improvement  (was: Bug)

> Table API has no Functions like  sparkSQL explode
> -
>
> Key: FLINK-18230
> URL: https://issues.apache.org/jira/browse/FLINK-18230
> Project: Flink
>  Issue Type: Improvement
>Reporter: mzz
>Priority: Major
>
> streamTableEnvironment.connect(new Kafka()
>   .topic(TOPIC)
>   .version(VERSION)
>   .startFromEarliest()
>   .property("bootstrap.servers", "172.16.30.207:9092")
>   .property("group.id", "km_aggs_group_3")
> )
>   .withFormat(
> new Json()
>   .failOnMissingField(true)
>   .deriveSchema()
>   )
>   .withSchema(new Schema()
> .field("devs", Types.STRING())
> .field("advs", ObjectArrayTypeInfo.getInfoFor(new 
> Array[Row](0).getClass,
> Types.ROW(Array("count", "eventid", "sid", "params"), 
> Array[TypeInformation[_]](Types.STRING(), Types.STRING, Types.STRING,
>   Types.ROW(Array("adid", "adtype", "ecpm"), 
> Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING
>   ))
> .field("identity", Types.STRING())
> .field("ip", Types.STRING())
> .field("launchs", Types.STRING())
> .field("ts", Types.STRING())
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> val tableResult = streamTableEnvironment.sqlQuery("select ip, 
> ts,launchs,advs[1].`count` from aggs_test")
> tableResult.printSchema()
> streamTableEnvironment.toAppendStream[Row](tableResult).print()



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18230) Table API has no Functions like sparkSQL explode

2020-06-09 Thread mzz (Jira)
mzz created FLINK-18230:
---

 Summary: Table API has no Functions like  sparkSQL explode
 Key: FLINK-18230
 URL: https://issues.apache.org/jira/browse/FLINK-18230
 Project: Flink
  Issue Type: Bug
Reporter: mzz


streamTableEnvironment.connect(new Kafka()
  .topic(TOPIC)
  .version(VERSION)
  .startFromEarliest()
  .property("bootstrap.servers", "172.16.30.207:9092")
  .property("group.id", "km_aggs_group_3")
)
  .withFormat(
new Json()
  .failOnMissingField(true)
  .deriveSchema()
  )
  .withSchema(new Schema()
.field("devs", Types.STRING())
.field("advs", ObjectArrayTypeInfo.getInfoFor(new 
Array[Row](0).getClass,
Types.ROW(Array("count", "eventid", "sid", "params"), 
Array[TypeInformation[_]](Types.STRING(), Types.STRING, Types.STRING,
  Types.ROW(Array("adid", "adtype", "ecpm"), 
Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING
  ))
.field("identity", Types.STRING())
.field("ip", Types.STRING())
.field("launchs", Types.STRING())
.field("ts", Types.STRING())
  )
  .inAppendMode()
  .registerTableSource("aggs_test")

val tableResult = streamTableEnvironment.sqlQuery("select ip, 
ts,launchs,advs[1].`count` from aggs_test")
tableResult.printSchema()
streamTableEnvironment.toAppendStream[Row](tableResult).print()



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-18184) Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory'

2020-06-08 Thread mzz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mzz updated FLINK-18184:

Priority: Major  (was: Critical)

> Could not find a suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory'
> -
>
> Key: FLINK-18184
> URL: https://issues.apache.org/jira/browse/FLINK-18184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.9.1
> Environment: local:macos
> flink1.9
>  
>Reporter: mzz
>Priority: Major
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(5000) // checkpoint every 5000 msecs
> //kafak配置
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "172.16.30.207:9092")
> properties.setProperty("group.id", "km_aggs_group")
> val fsSettings = 
> EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
> val kafkaConsumer = new FlinkKafkaConsumer[String](TOPIC, new 
> SimpleStringSchema(), properties).setStartFromEarliest()
> //val source = env.addSource(kafkaConsumer)
> val streamTableEnvironment = StreamTableEnvironment.create(env,fsSettings)
> streamTableEnvironment.connect(new Kafka()
>   .topic(TOPIC)
>   .version(VERSION)
>   .startFromEarliest()
>   .property("bootstrap.servers", "172.16.30.207:9092")
>   .property("zookeeper.connect", "172.16.30.207:2181")
>   .property("group.id", "km_aggs_group_table")
>   //  .properties(properties)
> )
>   .withFormat(
> new Json()
>   .failOnMissingField(true)
>   .deriveSchema()
>   )
>   .withSchema(new Schema()
> .field("advs", Types.STRING())
> .field("devs", Types.STRING())
> .field("environment", Types.STRING())
> .field("events", Types.STRING())
> .field("identity", Types.STRING())
> .field("ip", Types.STRING())
> .field("launchs", Types.STRING())
> .field("ts", Types.STRING())
>   )
>   .inAppendMode()
>   .registerTableSource("aggs_test")
> val tableResult = streamTableEnvironment.sqlQuery("select * from 
> aggs_test")
> tableResult.printSchema()
> //streamTableEnvironment.toAppendStream[Row](tableResult).print()
> //启动程序
> env.execute("test_kafka")
> 
> erroe message:
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> findAndCreateTableSource failed.
>   at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
>   at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
>   at 
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
>   at 
> KM.COM.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:70)
>   at 
> KM.COM.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
> Reason: No context matches.
> The following properties are requested:
> connector.properties.0.key=zookeeper.connect
> connector.properties.0.value=172.16.30.207:2181
> connector.properties.1.key=group.id
> connector.properties.1.value=km_aggs_group_table
> connector.properties.2.key=bootstrap.servers
> connector.properties.2.value=172.16.30.207:9092
> connector.property-version=1
> connector.startup-mode=earliest-offset
> connector.topic=aggs_topic
> connector.type=kafka
> connector.version=2.0
> format.derive-schema=true
> format.fail-on-missing-field=true
> format.property-version=1
> format.type=json
> schema.0.name=advs
> schema.0.type=VARCHAR
> schema.1.name=devs
> schema.1.type=VARCHAR
> schema.2.name=environment
> schema.2.type=VARCHAR
> schema.3.name=events
> schema.3.type=VARCHAR
> schema.4.name=identity
> schema.4.type=VARCHAR
> schema.5.name=ip
> schema.5.type=VARCHAR
> schema.6.name=launchs
> schema.6.type=VARCHAR
> schema.7.name=ts
> schema.7.type=VARCHAR
> update-mode=append
> The following factories have been considered:
> org.apache.flink.formats.json.JsonRowFormatFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.planner.StreamPlannerFactory
> org.apache.flink.table.executor.StreamExecutorFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> 

[jira] [Created] (FLINK-18184) Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory'

2020-06-08 Thread mzz (Jira)
mzz created FLINK-18184:
---

 Summary: Could not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory'
 Key: FLINK-18184
 URL: https://issues.apache.org/jira/browse/FLINK-18184
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.9.1
 Environment: local:macos

flink1.9

 
Reporter: mzz


val env = StreamExecutionEnvironment.getExecutionEnvironment

env.enableCheckpointing(5000) // checkpoint every 5000 msecs

//kafak配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "172.16.30.207:9092")
properties.setProperty("group.id", "km_aggs_group")


val fsSettings = 
EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val kafkaConsumer = new FlinkKafkaConsumer[String](TOPIC, new 
SimpleStringSchema(), properties).setStartFromEarliest()
//val source = env.addSource(kafkaConsumer)
val streamTableEnvironment = StreamTableEnvironment.create(env,fsSettings)
streamTableEnvironment.connect(new Kafka()
  .topic(TOPIC)
  .version(VERSION)
  .startFromEarliest()

  .property("bootstrap.servers", "172.16.30.207:9092")
  .property("zookeeper.connect", "172.16.30.207:2181")
  .property("group.id", "km_aggs_group_table")
  //  .properties(properties)
)
  .withFormat(
new Json()
  .failOnMissingField(true)
  .deriveSchema()
  )
  .withSchema(new Schema()
.field("advs", Types.STRING())
.field("devs", Types.STRING())
.field("environment", Types.STRING())
.field("events", Types.STRING())
.field("identity", Types.STRING())
.field("ip", Types.STRING())
.field("launchs", Types.STRING())
.field("ts", Types.STRING())
  )
  .inAppendMode()
  .registerTableSource("aggs_test")

val tableResult = streamTableEnvironment.sqlQuery("select * from aggs_test")
tableResult.printSchema()
//streamTableEnvironment.toAppendStream[Row](tableResult).print()
//启动程序
env.execute("test_kafka")



erroe message:

Exception in thread "main" org.apache.flink.table.api.TableException: 
findAndCreateTableSource failed.
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
at 
KM.COM.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:70)
at 
KM.COM.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=172.16.30.207:2181
connector.properties.1.key=group.id
connector.properties.1.value=km_aggs_group_table
connector.properties.2.key=bootstrap.servers
connector.properties.2.value=172.16.30.207:9092
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=aggs_topic
connector.type=kafka
connector.version=2.0
format.derive-schema=true
format.fail-on-missing-field=true
format.property-version=1
format.type=json
schema.0.name=advs
schema.0.type=VARCHAR
schema.1.name=devs
schema.1.type=VARCHAR
schema.2.name=environment
schema.2.type=VARCHAR
schema.3.name=events
schema.3.type=VARCHAR
schema.4.name=identity
schema.4.type=VARCHAR
schema.5.name=ip
schema.5.type=VARCHAR
schema.6.name=launchs
schema.6.type=VARCHAR
schema.7.name=ts
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at