[GitHub] zjffdu commented on a change in pull request #7121: [FLINK-9555]Support table api in scala shell

2018-11-29 Thread GitBox
zjffdu commented on a change in pull request #7121: [FLINK-9555]Support table 
api in scala shell
URL: https://github.com/apache/flink/pull/7121#discussion_r237735988
 
 

 ##
 File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 ##
 @@ -168,6 +168,56 @@ class ScalaShellITCase extends TestLogger {
 Assert.assertTrue(output.contains("WC(world,10)"))
   }
 
+  @Test
+  def testSimpleSelectWithFilterBatchTableAPIQuery: Unit = {
+val input =
+  """
+|import _root_.scala.collection.mutable
+|val data = new mutable.MutableList[(Int, Long, String)]
+|data.+=((1, 1L, "Hi"))
+|data.+=((2, 2L, "Hello"))
+|data.+=((3, 2L, "Hello world"))
+|val t = benv.fromCollection(data).toTable(btenv, 'a, 'b, 
'c).select("*").where('a % 2 === 1 )
+|val results = t.toDataSet[Row].collect()
+|results.foreach(println)
+|:q
+  """.stripMargin
+val output = processInShell(input)
+Assert.assertFalse(output.contains("failed"))
+Assert.assertFalse(output.contains("error"))
+Assert.assertFalse(output.contains("Exception"))
 
 Review comment:
   We also need to verify the output contains "Hi", "Hello world" but not 
"Hello".


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zjffdu commented on a change in pull request #7121: [FLINK-9555]Support table api in scala shell

2018-11-29 Thread GitBox
zjffdu commented on a change in pull request #7121: [FLINK-9555]Support table 
api in scala shell
URL: https://github.com/apache/flink/pull/7121#discussion_r237736038
 
 

 ##
 File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 ##
 @@ -168,6 +168,56 @@ class ScalaShellITCase extends TestLogger {
 Assert.assertTrue(output.contains("WC(world,10)"))
   }
 
+  @Test
+  def testSimpleSelectWithFilterBatchTableAPIQuery: Unit = {
+val input =
+  """
+|import _root_.scala.collection.mutable
+|val data = new mutable.MutableList[(Int, Long, String)]
+|data.+=((1, 1L, "Hi"))
+|data.+=((2, 2L, "Hello"))
+|data.+=((3, 2L, "Hello world"))
+|val t = benv.fromCollection(data).toTable(btenv, 'a, 'b, 
'c).select("*").where('a % 2 === 1 )
+|val results = t.toDataSet[Row].collect()
+|results.foreach(println)
+|:q
+  """.stripMargin
+val output = processInShell(input)
+Assert.assertFalse(output.contains("failed"))
+Assert.assertFalse(output.contains("error"))
+Assert.assertFalse(output.contains("Exception"))
+  }
+
+  @Test
+  def testGroupedAggregationStreamTableAPIQuery: Unit = {
+val input =
+  """
+|  val data = List(
+|("Hello", 1),
+|("word", 1),
+|("Hello", 1),
+|("bark", 1),
+|("bark", 1),
+|("bark", 1),
+|("bark", 1),
+|("bark", 1),
+|("bark", 1),
+|("flink", 1)
+|  )
+| val stream = senv.fromCollection(data)
+| val table = stream.toTable(stenv, 'word, 'num)
+| val resultTable = table.groupBy('word).select('num.sum as 
'count).groupBy('count).select(
+| 'count,'count.count as 'frequency)
+| val results = resultTable.toRetractStream[Row]
+| results.print
+| senv.execute
+  """.stripMargin
+val output = processInShell(input)
+Assert.assertFalse(output.contains("failed"))
+Assert.assertFalse(output.contains("error"))
+Assert.assertFalse(output.contains("Exception"))
 
 Review comment:
   We also need to verify the output contains the correct word count result.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9555) Support table api in scala shell

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704210#comment-16704210
 ] 

ASF GitHub Bot commented on FLINK-9555:
---

zjffdu commented on a change in pull request #7121: [FLINK-9555]Support table 
api in scala shell
URL: https://github.com/apache/flink/pull/7121#discussion_r237736038
 
 

 ##
 File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 ##
 @@ -168,6 +168,56 @@ class ScalaShellITCase extends TestLogger {
 Assert.assertTrue(output.contains("WC(world,10)"))
   }
 
+  @Test
+  def testSimpleSelectWithFilterBatchTableAPIQuery: Unit = {
+val input =
+  """
+|import _root_.scala.collection.mutable
+|val data = new mutable.MutableList[(Int, Long, String)]
+|data.+=((1, 1L, "Hi"))
+|data.+=((2, 2L, "Hello"))
+|data.+=((3, 2L, "Hello world"))
+|val t = benv.fromCollection(data).toTable(btenv, 'a, 'b, 
'c).select("*").where('a % 2 === 1 )
+|val results = t.toDataSet[Row].collect()
+|results.foreach(println)
+|:q
+  """.stripMargin
+val output = processInShell(input)
+Assert.assertFalse(output.contains("failed"))
+Assert.assertFalse(output.contains("error"))
+Assert.assertFalse(output.contains("Exception"))
+  }
+
+  @Test
+  def testGroupedAggregationStreamTableAPIQuery: Unit = {
+val input =
+  """
+|  val data = List(
+|("Hello", 1),
+|("word", 1),
+|("Hello", 1),
+|("bark", 1),
+|("bark", 1),
+|("bark", 1),
+|("bark", 1),
+|("bark", 1),
+|("bark", 1),
+|("flink", 1)
+|  )
+| val stream = senv.fromCollection(data)
+| val table = stream.toTable(stenv, 'word, 'num)
+| val resultTable = table.groupBy('word).select('num.sum as 
'count).groupBy('count).select(
+| 'count,'count.count as 'frequency)
+| val results = resultTable.toRetractStream[Row]
+| results.print
+| senv.execute
+  """.stripMargin
+val output = processInShell(input)
+Assert.assertFalse(output.contains("failed"))
+Assert.assertFalse(output.contains("error"))
+Assert.assertFalse(output.contains("Exception"))
 
 Review comment:
   We also need to verify the output contains the correct word count result.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support table api in scala shell
> 
>
> Key: FLINK-9555
> URL: https://issues.apache.org/jira/browse/FLINK-9555
> Project: Flink
>  Issue Type: New Feature
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Assignee: shuiqiangchen
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to have table api available in scala shell so that user can 
> experience table api in interactive way. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9555) Support table api in scala shell

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704209#comment-16704209
 ] 

ASF GitHub Bot commented on FLINK-9555:
---

zjffdu commented on a change in pull request #7121: [FLINK-9555]Support table 
api in scala shell
URL: https://github.com/apache/flink/pull/7121#discussion_r237735988
 
 

 ##
 File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 ##
 @@ -168,6 +168,56 @@ class ScalaShellITCase extends TestLogger {
 Assert.assertTrue(output.contains("WC(world,10)"))
   }
 
+  @Test
+  def testSimpleSelectWithFilterBatchTableAPIQuery: Unit = {
+val input =
+  """
+|import _root_.scala.collection.mutable
+|val data = new mutable.MutableList[(Int, Long, String)]
+|data.+=((1, 1L, "Hi"))
+|data.+=((2, 2L, "Hello"))
+|data.+=((3, 2L, "Hello world"))
+|val t = benv.fromCollection(data).toTable(btenv, 'a, 'b, 
'c).select("*").where('a % 2 === 1 )
+|val results = t.toDataSet[Row].collect()
+|results.foreach(println)
+|:q
+  """.stripMargin
+val output = processInShell(input)
+Assert.assertFalse(output.contains("failed"))
+Assert.assertFalse(output.contains("error"))
+Assert.assertFalse(output.contains("Exception"))
 
 Review comment:
   We also need to verify the output contains "Hi", "Hello world" but not 
"Hello".


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support table api in scala shell
> 
>
> Key: FLINK-9555
> URL: https://issues.apache.org/jira/browse/FLINK-9555
> Project: Flink
>  Issue Type: New Feature
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Assignee: shuiqiangchen
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to have table api available in scala shell so that user can 
> experience table api in interactive way. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
dianfu commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237722370
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -35,20 +36,26 @@ import org.apache.flink.util.Collector
 class PatternSelectFunctionRunner(
 name: String,
 code: String)
-  extends PatternFlatSelectFunction[Row, CRow]
-  with Compiler[PatternSelectFunction[Row, Row]]
+  extends RichPatternFlatSelectFunction[Row, CRow]
 
 Review comment:
   @dawidwys I have rethought about this issue and what about always generating 
a `PatternFlatSelectFunction` no matter `ONE ROW PER MATCH` or `ALL ROWS PER 
MATCH` is specified? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704135#comment-16704135
 ] 

ASF GitHub Bot commented on FLINK-10597:


dianfu commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237722370
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -35,20 +36,26 @@ import org.apache.flink.util.Collector
 class PatternSelectFunctionRunner(
 name: String,
 code: String)
-  extends PatternFlatSelectFunction[Row, CRow]
-  with Compiler[PatternSelectFunction[Row, Row]]
+  extends RichPatternFlatSelectFunction[Row, CRow]
 
 Review comment:
   @dawidwys I have rethought about this issue and what about always generating 
a `PatternFlatSelectFunction` no matter `ONE ROW PER MATCH` or `ALL ROWS PER 
MATCH` is specified? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237727039
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
 
 Review comment:
   `patternName` doesn't reflects what this variable holds. What about rename 
`patternName` to `eventsExpr` or something else?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237739790
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg = if (nullCheck) {
+j"""
+   |boolean $singleResultNullTerm;
+   |$primitiveSingleResultTypeTerm $singleResultTerm;
+   |if ($resultRowTerm.getField(${aggregates.size}) != null) {
+   |  $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm
+   |.getField(${aggregates.size});
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+  } else {
+j"""
+   |boolean $singleResultNullTerm = false;
+   |$primitiveSingleResultTypeTerm $singleResultTerm =
+   |($boxedSingleResultTypeTerm) 
$resultRowTerm.getField(${aggregates.size});
+   |""".stripMargin
+  }
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  GeneratedExpression(singleResultTerm, singleResultNullTerm, NO_CODE, 
singleResultType)
+}
+
+def generateAggFunction() : Unit = {
+  val matchAgg = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+matchAgg.inputExprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+matchAgg.aggregations.map(_.aggFunction).toArray,
+matchAgg.aggregations.map(_.inputIndices).toArray,
+matchAgg.aggregations.indices.toArray,
+Array.fill(matchAgg.aggregations.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+matchAgg.aggregations.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+matchAgg.inputExprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class LogicalMatchAggCall(
+  function: SqlAggFunction,
+  inputTypes: Seq[RelDataType],
+  exprIndices: Seq[Int]
+)
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private case class MatchAgg(
+  aggregations: Seq[MatchAggCall],
+  inputExprs: Seq[RexNode]
+)
+
+private def extractAggregatesAndExpressions: MatchAgg = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val logicalAggregates = aggregates.map(rexAggCall => {
 
 Review comment:
   rexAggCall renamed to aggCall?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and 

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704252#comment-16704252
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237472659
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
 
 Review comment:
   What about renaming to generateAggregation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704247#comment-16704247
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237472868
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
 
 Review comment:
   getOrComputeAggregation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704246#comment-16704246
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237727039
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
 
 Review comment:
   `patternName` doesn't reflects what this variable holds. What about rename 
`patternName` to `eventsExpr` or something else?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704245#comment-16704245
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237466962
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -71,12 +78,28 @@ class MatchCodeGenerator(
   private var offset: Int = 0
   private var first : Boolean = false
 
+  /**
+* Flags that tells if we generate expressions inside an aggregate. It 
tells how to access input
 
 Review comment:
   Flags -> Flag


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704250#comment-16704250
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237738914
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg = if (nullCheck) {
+j"""
+   |boolean $singleResultNullTerm;
+   |$primitiveSingleResultTypeTerm $singleResultTerm;
+   |if ($resultRowTerm.getField(${aggregates.size}) != null) {
+   |  $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm
+   |.getField(${aggregates.size});
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+  } else {
+j"""
+   |boolean $singleResultNullTerm = false;
+   |$primitiveSingleResultTypeTerm $singleResultTerm =
+   |($boxedSingleResultTypeTerm) 
$resultRowTerm.getField(${aggregates.size});
+   |""".stripMargin
+  }
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  GeneratedExpression(singleResultTerm, singleResultNullTerm, NO_CODE, 
singleResultType)
+}
+
+def generateAggFunction() : Unit = {
 
 Review comment:
   Remove the whitespace before the colon


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237472868
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
 
 Review comment:
   getOrComputeAggregation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237731718
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg = if (nullCheck) {
+j"""
+   |boolean $singleResultNullTerm;
+   |$primitiveSingleResultTypeTerm $singleResultTerm;
+   |if ($resultRowTerm.getField(${aggregates.size}) != null) {
+   |  $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm
+   |.getField(${aggregates.size});
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+  } else {
+j"""
+   |boolean $singleResultNullTerm = false;
+   |$primitiveSingleResultTypeTerm $singleResultTerm =
+   |($boxedSingleResultTypeTerm) 
$resultRowTerm.getField(${aggregates.size});
+   |""".stripMargin
+  }
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  GeneratedExpression(singleResultTerm, singleResultNullTerm, NO_CODE, 
singleResultType)
+}
+
+def generateAggFunction() : Unit = {
+  val matchAgg = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+matchAgg.inputExprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+matchAgg.aggregations.map(_.aggFunction).toArray,
+matchAgg.aggregations.map(_.inputIndices).toArray,
+matchAgg.aggregations.indices.toArray,
+Array.fill(matchAgg.aggregations.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+matchAgg.aggregations.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+matchAgg.inputExprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class LogicalMatchAggCall(
+  function: SqlAggFunction,
+  inputTypes: Seq[RelDataType],
+  exprIndices: Seq[Int]
+)
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private case class MatchAgg(
+  aggregations: Seq[MatchAggCall],
+  inputExprs: Seq[RexNode]
+)
+
+private def extractAggregatesAndExpressions: MatchAgg = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val logicalAggregates = aggregates.map(rexAggCall => {
+val callsWithIndices = rexAggCall.operands.asScala.map(innerCall => {
+  inputRows.get(innerCall.toString) match {
+case Some(x) =>
+  x
+
+case None =>
+  val 

[GitHub] dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237738914
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg = if (nullCheck) {
+j"""
+   |boolean $singleResultNullTerm;
+   |$primitiveSingleResultTypeTerm $singleResultTerm;
+   |if ($resultRowTerm.getField(${aggregates.size}) != null) {
+   |  $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm
+   |.getField(${aggregates.size});
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+  } else {
+j"""
+   |boolean $singleResultNullTerm = false;
+   |$primitiveSingleResultTypeTerm $singleResultTerm =
+   |($boxedSingleResultTypeTerm) 
$resultRowTerm.getField(${aggregates.size});
+   |""".stripMargin
+  }
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  GeneratedExpression(singleResultTerm, singleResultNullTerm, NO_CODE, 
singleResultType)
+}
+
+def generateAggFunction() : Unit = {
 
 Review comment:
   Remove the whitespace before the colon


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237724165
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -42,6 +50,90 @@ import scala.collection.mutable
 /**
   * A code generator for generating CEP related functions.
   *
+  * Aggregates are generated as follows:
+  * 1. all aggregate [[RexCall]]s are grouped by corresponding pattern variable
+  * 2. even if the same aggregation is used multiple times in an expression
+  *(e.g. SUM(A.price) > SUM(A.price) + 1) it will be calculated once. To 
do so [[AggBuilder]]
+  *keeps set of already seen different aggregation calls, and reuses the 
code to access
+  *appropriate field of aggregation result
+  * 3. after translating every expression (either in [[generateCondition]] or 
in
+  *[[generateOneRowPerMatchExpression]]) there will be generated code for
+  *   * [[GeneratedFunction]], which will be an inner class
 
 Review comment:
   code format


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704249#comment-16704249
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237737335
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
 
 Review comment:
   What about removing the prefix `single`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237466962
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -71,12 +78,28 @@ class MatchCodeGenerator(
   private var offset: Int = 0
   private var first : Boolean = false
 
+  /**
+* Flags that tells if we generate expressions inside an aggregate. It 
tells how to access input
 
 Review comment:
   Flags -> Flag


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237737176
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
 
 Review comment:
   Define `calculateAgg_$variableUID` as a variable such as `val 
calculateAggFuncName = calculateAgg_$variableUID`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704254#comment-16704254
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237726170
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
 
 Review comment:
   rename to aggCall


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237726170
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
 
 Review comment:
   rename to aggCall


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237472659
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +498,223 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def getOrAddAggregation(call: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((call.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = generateAggAccess(call)
+  aggregates += call
+  reusableInputUnboxingExprs((call.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def generateAggAccess(call: RexCall) = {
 
 Review comment:
   What about renaming to generateAggregation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237737335
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
 
 Review comment:
   What about removing the prefix `single`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704244#comment-16704244
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237737176
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
 
 Review comment:
   Define `calculateAgg_$variableUID` as a variable such as `val 
calculateAggFuncName = calculateAgg_$variableUID`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11034) Provide "rewriting config” to file system factory

2018-11-29 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-11034:
---

 Summary: Provide "rewriting config” to file system factory 
 Key: FLINK-11034
 URL: https://issues.apache.org/jira/browse/FLINK-11034
 Project: Flink
  Issue Type: Improvement
Reporter: Wei-Che Wei


In the discussion in this mailing thread 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/how-to-override-s3-key-config-in-flink-job-td24606.html],
 it showed that it is not able to overwrite config in file system factory when 
submit a flink job.

That means we will share the same config for multiple jobs in a session 
cluster. Or user can't use different configuration for checkpointing and file 
sink. For example, user might have different s3 buckets for checkpointing and 
file sink, but each of the s3 bucket might have different s3 access key for 
some management concerns.

We might need to provide a way to overwrite configuration when calling file 
system factory "get" method, and let those user facing components, like 
checkpointing or file sink, will be able to get overwriting config from user 
and create a file system with those changes in the new config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #7185: [FLINK-10884] [yarn/mesos] adjust container memory param to set a safe margin from offheap memory

2018-11-29 Thread GitBox
zhijiangW commented on a change in pull request #7185: [FLINK-10884] 
[yarn/mesos]  adjust  container memory param  to set a safe margin from offheap 
memory
URL: https://github.com/apache/flink/pull/7185#discussion_r237743628
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 ##
 @@ -158,8 +158,10 @@ public static ContaineredTaskManagerParameters create(
 
// (2) split the remaining Java memory between heap and off-heap
final long heapSizeMB = 
TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config);
-   // use the cut-off memory for off-heap (that was its intention)
-   final long offHeapSizeMB = containerMemoryMB - heapSizeMB;
+   // (3) try to compute the offHeapMemory from a safe margin
+   final long restMemoryMB = containerMemoryMB - heapSizeMB;
+   final long offHeapCutoffMemory = 
calculateOffHeapCutoffMB(config, restMemoryMB);
 
 Review comment:
   Currently we already have a `containerized.heap-cutoff-ratio` for reserving 
some memories for other usages. And the `heapSizeMB` is calculated based on 
`containerMemoryMB - cutoffMB`, so the `heapSizeMB+ offHeapSizeMB` should be 
`containerMemoryMB-cutoffMB`.
   
   You further extend the `cutoff-ratio` to 
`containerized.offheap-cutoff-ratio`. I think there are two options:
   
   1. Adjust the existing `containerized.heap-cutoff-ratio` to 
`containerized.cutoff-ratio` which means reserving some physical memories used 
for both heap and off-heap.
   
   2. Separate into two different parameters as you provide. I am not sure 
whether it can get extra benefits compared with first option. But it may make 
the things a little complicated, because the memory can be further divided into 
heap, direct, native (used by rockdb state backend). The direct and native 
memories can be both regarded as off-heap in general speaking. If to do so, do 
we also need `containerized.offheap-cutoff-min` matched with existing 
`containerized.heap-cutoff-min`?
   
   BTW, I think you can increase the current `containerized.heap-cutoff-ratio` 
and  `containerized.heap-cutoff-min` to avoid container killed because of 
exceeding memories. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704259#comment-16704259
 ] 

ASF GitHub Bot commented on FLINK-10884:


zhijiangW commented on a change in pull request #7185: [FLINK-10884] 
[yarn/mesos]  adjust  container memory param  to set a safe margin from offheap 
memory
URL: https://github.com/apache/flink/pull/7185#discussion_r237743628
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 ##
 @@ -158,8 +158,10 @@ public static ContaineredTaskManagerParameters create(
 
// (2) split the remaining Java memory between heap and off-heap
final long heapSizeMB = 
TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config);
-   // use the cut-off memory for off-heap (that was its intention)
-   final long offHeapSizeMB = containerMemoryMB - heapSizeMB;
+   // (3) try to compute the offHeapMemory from a safe margin
+   final long restMemoryMB = containerMemoryMB - heapSizeMB;
+   final long offHeapCutoffMemory = 
calculateOffHeapCutoffMB(config, restMemoryMB);
 
 Review comment:
   Currently we already have a `containerized.heap-cutoff-ratio` for reserving 
some memories for other usages. And the `heapSizeMB` is calculated based on 
`containerMemoryMB - cutoffMB`, so the `heapSizeMB+ offHeapSizeMB` should be 
`containerMemoryMB-cutoffMB`.
   
   You further extend the `cutoff-ratio` to 
`containerized.offheap-cutoff-ratio`. I think there are two options:
   
   1. Adjust the existing `containerized.heap-cutoff-ratio` to 
`containerized.cutoff-ratio` which means reserving some physical memories used 
for both heap and off-heap.
   
   2. Separate into two different parameters as you provide. I am not sure 
whether it can get extra benefits compared with first option. But it may make 
the things a little complicated, because the memory can be further divided into 
heap, direct, native (used by rockdb state backend). The direct and native 
memories can be both regarded as off-heap in general speaking. If to do so, do 
we also need `containerized.offheap-cutoff-min` matched with existing 
`containerized.heap-cutoff-min`?
   
   BTW, I think you can increase the current `containerized.heap-cutoff-ratio` 
and  `containerized.heap-cutoff-min` to avoid container killed because of 
exceeding memories. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink on yarn  TM container will be killed by nodemanager because of  the 
> exceeded  physical memory.
> 
>
> Key: FLINK-10884
> URL: https://issues.apache.org/jira/browse/FLINK-10884
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Core
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
> Environment: version  : 1.6.2 
> module : flink on yarn
> centos  jdk1.8
> hadoop 2.7
>Reporter: wgcn
>Assignee: wgcn
>Priority: Major
>  Labels: pull-request-available, yarn
>
> TM container will be killed by nodemanager because of  the exceeded  
> [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi]
>  memory. I found the lanuch context   lanuching TM container  that  
> "container memory =   heap memory+ offHeapSizeMB"  at the class 
> org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters   
> from line 160 to 166  I set a safety margin for the whole memory container 
> using. For example  if the container  limit 3g  memory,  the sum memory that  
>  "heap memory+ offHeapSizeMB"  is equal to  2.4g to prevent the container 
> being killed.Do we have the 
> [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC]
>  solution  or I can commit my solution



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] maqingxiang commented on issue #7197: [hotfix][flink-runtime] add try catch when find a unique file name for the spilling channel

2018-11-29 Thread GitBox
maqingxiang commented on issue #7197: [hotfix][flink-runtime] add try catch 
when find a unique file name for the spilling channel
URL: https://github.com/apache/flink/pull/7197#issuecomment-443067252
 
 
   @zhijiangW thanks for your review.
   
   Sorry, I will create a jira ticket next time . :)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zjffdu commented on a change in pull request #7121: [FLINK-9555]Support table api in scala shell

2018-11-29 Thread GitBox
zjffdu commented on a change in pull request #7121: [FLINK-9555]Support table 
api in scala shell
URL: https://github.com/apache/flink/pull/7121#discussion_r237735688
 
 

 ##
 File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 ##
 @@ -168,6 +168,56 @@ class ScalaShellITCase extends TestLogger {
 Assert.assertTrue(output.contains("WC(world,10)"))
   }
 
+  @Test
+  def testSimpleSelectWithFilterBatchTableAPIQuery: Unit = {
+val input =
+  """
+|import _root_.scala.collection.mutable
+|val data = new mutable.MutableList[(Int, Long, String)]
+|data.+=((1, 1L, "Hi"))
+|data.+=((2, 2L, "Hello"))
+|data.+=((3, 2L, "Hello world"))
+|val t = benv.fromCollection(data).toTable(btenv, 'a, 'b, 
'c).select("*").where('a % 2 === 1 )
 
 Review comment:
   MutableList is not necessary here.  Use this instead
   ```
   val data = Seq(
 (1, 1L, "Hi),
 (2, 2L, "Hello"),
 (3, 2L, "Hello world")
   )
   ``` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zjffdu commented on a change in pull request #7121: [FLINK-9555]Support table api in scala shell

2018-11-29 Thread GitBox
zjffdu commented on a change in pull request #7121: [FLINK-9555]Support table 
api in scala shell
URL: https://github.com/apache/flink/pull/7121#discussion_r237735688
 
 

 ##
 File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 ##
 @@ -168,6 +168,56 @@ class ScalaShellITCase extends TestLogger {
 Assert.assertTrue(output.contains("WC(world,10)"))
   }
 
+  @Test
+  def testSimpleSelectWithFilterBatchTableAPIQuery: Unit = {
+val input =
+  """
+|import _root_.scala.collection.mutable
+|val data = new mutable.MutableList[(Int, Long, String)]
+|data.+=((1, 1L, "Hi"))
+|data.+=((2, 2L, "Hello"))
+|data.+=((3, 2L, "Hello world"))
+|val t = benv.fromCollection(data).toTable(btenv, 'a, 'b, 
'c).select("*").where('a % 2 === 1 )
 
 Review comment:
   MutableList is not necessary here. 
   ```
   val data = Seq(
 (1, 1L, "Hi),
 (2, 2L, "Hello"),
 (3, 2L, "Hello world")
   )
   ``` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9555) Support table api in scala shell

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704205#comment-16704205
 ] 

ASF GitHub Bot commented on FLINK-9555:
---

zjffdu commented on a change in pull request #7121: [FLINK-9555]Support table 
api in scala shell
URL: https://github.com/apache/flink/pull/7121#discussion_r237735688
 
 

 ##
 File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 ##
 @@ -168,6 +168,56 @@ class ScalaShellITCase extends TestLogger {
 Assert.assertTrue(output.contains("WC(world,10)"))
   }
 
+  @Test
+  def testSimpleSelectWithFilterBatchTableAPIQuery: Unit = {
+val input =
+  """
+|import _root_.scala.collection.mutable
+|val data = new mutable.MutableList[(Int, Long, String)]
+|data.+=((1, 1L, "Hi"))
+|data.+=((2, 2L, "Hello"))
+|data.+=((3, 2L, "Hello world"))
+|val t = benv.fromCollection(data).toTable(btenv, 'a, 'b, 
'c).select("*").where('a % 2 === 1 )
 
 Review comment:
   MutableList is not necessary here. 
   ```
   val data = Seq(
 (1, 1L, "Hi),
 (2, 2L, "Hello"),
 (3, 2L, "Hello world")
   )
   ``` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support table api in scala shell
> 
>
> Key: FLINK-9555
> URL: https://issues.apache.org/jira/browse/FLINK-9555
> Project: Flink
>  Issue Type: New Feature
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Assignee: shuiqiangchen
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to have table api available in scala shell so that user can 
> experience table api in interactive way. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-11-29 Thread GitBox
klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-443079619
 
 
   Thanks a lot for your reviews, I learnt a lot from them. @azagrebin 
   I fixed the java doc, and thank you again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9555) Support table api in scala shell

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704207#comment-16704207
 ] 

ASF GitHub Bot commented on FLINK-9555:
---

zjffdu commented on a change in pull request #7121: [FLINK-9555]Support table 
api in scala shell
URL: https://github.com/apache/flink/pull/7121#discussion_r237735688
 
 

 ##
 File path: 
flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 ##
 @@ -168,6 +168,56 @@ class ScalaShellITCase extends TestLogger {
 Assert.assertTrue(output.contains("WC(world,10)"))
   }
 
+  @Test
+  def testSimpleSelectWithFilterBatchTableAPIQuery: Unit = {
+val input =
+  """
+|import _root_.scala.collection.mutable
+|val data = new mutable.MutableList[(Int, Long, String)]
+|data.+=((1, 1L, "Hi"))
+|data.+=((2, 2L, "Hello"))
+|data.+=((3, 2L, "Hello world"))
+|val t = benv.fromCollection(data).toTable(btenv, 'a, 'b, 
'c).select("*").where('a % 2 === 1 )
 
 Review comment:
   MutableList is not necessary here.  Use this instead
   ```
   val data = Seq(
 (1, 1L, "Hi),
 (2, 2L, "Hello"),
 (3, 2L, "Hello world")
   )
   ``` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support table api in scala shell
> 
>
> Key: FLINK-9555
> URL: https://issues.apache.org/jira/browse/FLINK-9555
> Project: Flink
>  Issue Type: New Feature
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Assignee: shuiqiangchen
>Priority: Major
>  Labels: pull-request-available
>
> It would be nice to have table api available in scala shell so that user can 
> experience table api in interactive way. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704215#comment-16704215
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-443079619
 
 
   Thanks a lot for your reviews, I learnt a lot from them. @azagrebin 
   I fixed the java doc, and thank you again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704248#comment-16704248
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237739790
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg = if (nullCheck) {
+j"""
+   |boolean $singleResultNullTerm;
+   |$primitiveSingleResultTypeTerm $singleResultTerm;
+   |if ($resultRowTerm.getField(${aggregates.size}) != null) {
+   |  $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm
+   |.getField(${aggregates.size});
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+  } else {
+j"""
+   |boolean $singleResultNullTerm = false;
+   |$primitiveSingleResultTypeTerm $singleResultTerm =
+   |($boxedSingleResultTypeTerm) 
$resultRowTerm.getField(${aggregates.size});
+   |""".stripMargin
+  }
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  GeneratedExpression(singleResultTerm, singleResultNullTerm, NO_CODE, 
singleResultType)
+}
+
+def generateAggFunction() : Unit = {
+  val matchAgg = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+matchAgg.inputExprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+matchAgg.aggregations.map(_.aggFunction).toArray,
+matchAgg.aggregations.map(_.inputIndices).toArray,
+matchAgg.aggregations.indices.toArray,
+Array.fill(matchAgg.aggregations.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+matchAgg.aggregations.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+matchAgg.inputExprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class LogicalMatchAggCall(
+  function: SqlAggFunction,
+  inputTypes: Seq[RelDataType],
+  exprIndices: Seq[Int]
+)
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private case class MatchAgg(
+  aggregations: Seq[MatchAggCall],
+  inputExprs: Seq[RexNode]
+)
+
+private def extractAggregatesAndExpressions: MatchAgg = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val logicalAggregates = 

[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704253#comment-16704253
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237724165
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -42,6 +50,90 @@ import scala.collection.mutable
 /**
   * A code generator for generating CEP related functions.
   *
+  * Aggregates are generated as follows:
+  * 1. all aggregate [[RexCall]]s are grouped by corresponding pattern variable
+  * 2. even if the same aggregation is used multiple times in an expression
+  *(e.g. SUM(A.price) > SUM(A.price) + 1) it will be calculated once. To 
do so [[AggBuilder]]
+  *keeps set of already seen different aggregation calls, and reuses the 
code to access
+  *appropriate field of aggregation result
+  * 3. after translating every expression (either in [[generateCondition]] or 
in
+  *[[generateOneRowPerMatchExpression]]) there will be generated code for
+  *   * [[GeneratedFunction]], which will be an inner class
 
 Review comment:
   code format


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704251#comment-16704251
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

dianfu commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237731718
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -442,4 +582,245 @@ class MatchCodeGenerator(
 
 generateFieldAccess(patternVariableRef.copy(code = NO_CODE), 
fieldRef.getIndex)
   }
+
+  class AggBuilder(variable: String) {
+
+private val aggregates = new mutable.ListBuffer[RexCall]()
+
+private val variableUID = newName("variable")
+
+private val resultRowTerm = newName(s"aggRow_$variableUID")
+
+private val rowTypeTerm = "org.apache.flink.types.Row"
+
+def generateAggAccess(aggCall: RexCall): GeneratedExpression = {
+  reusableInputUnboxingExprs.get((aggCall.toString, 0)) match  {
+case Some(expr) =>
+  expr
+
+case None =>
+  val exp: GeneratedExpression = doGenerateAggAccess(aggCall)
+  aggregates += aggCall
+  reusableInputUnboxingExprs((aggCall.toString, 0)) = exp
+  exp.copy(code = NO_CODE)
+  }
+}
+
+private def doGenerateAggAccess(call: RexCall) = {
+  val singleResultTerm = newName("result")
+  val singleResultNullTerm = newName("nullTerm")
+  val singleResultType = FlinkTypeFactory.toTypeInfo(call.`type`)
+  val primitiveSingleResultTypeTerm = 
primitiveTypeTermForTypeInfo(singleResultType)
+  val boxedSingleResultTypeTerm = 
boxedTypeTermForTypeInfo(singleResultType)
+
+  val patternName = findEventsByPatternName(variable)
+
+  val codeForAgg =
+j"""
+   |$rowTypeTerm $resultRowTerm = 
calculateAgg_$variableUID(${patternName.resultTerm});
+   |""".stripMargin
+
+  reusablePerRecordStatements += codeForAgg
+
+  val defaultValue = primitiveDefaultValue(singleResultType)
+  val codeForSingleAgg = if (nullCheck) {
+j"""
+   |boolean $singleResultNullTerm;
+   |$primitiveSingleResultTypeTerm $singleResultTerm;
+   |if ($resultRowTerm.getField(${aggregates.size}) != null) {
+   |  $singleResultTerm = ($boxedSingleResultTypeTerm) $resultRowTerm
+   |.getField(${aggregates.size});
+   |  $singleResultNullTerm = false;
+   |} else {
+   |  $singleResultNullTerm = true;
+   |  $singleResultTerm = $defaultValue;
+   |}
+   |""".stripMargin
+  } else {
+j"""
+   |boolean $singleResultNullTerm = false;
+   |$primitiveSingleResultTypeTerm $singleResultTerm =
+   |($boxedSingleResultTypeTerm) 
$resultRowTerm.getField(${aggregates.size});
+   |""".stripMargin
+  }
+
+  reusablePerRecordStatements += codeForSingleAgg
+
+  GeneratedExpression(singleResultTerm, singleResultNullTerm, NO_CODE, 
singleResultType)
+}
+
+def generateAggFunction() : Unit = {
+  val matchAgg = extractAggregatesAndExpressions
+
+  val aggGenerator = new AggregationCodeGenerator(config, false, input, 
None)
+
+  val aggFunc = aggGenerator.generateAggregations(
+s"AggFunction_$variableUID",
+matchAgg.inputExprs.map(r => FlinkTypeFactory.toTypeInfo(r.getType)),
+matchAgg.aggregations.map(_.aggFunction).toArray,
+matchAgg.aggregations.map(_.inputIndices).toArray,
+matchAgg.aggregations.indices.toArray,
+Array.fill(matchAgg.aggregations.size)(false),
+isStateBackedDataViews = false,
+partialResults = false,
+Array.emptyIntArray,
+None,
+matchAgg.aggregations.size,
+needRetract = false,
+needMerge = false,
+needReset = false,
+None
+  )
+
+  reusableMemberStatements.add(aggFunc.code)
+
+  val transformFuncName = s"transformRowForAgg_$variableUID"
+  val inputTransform: String = generateAggInputExprEvaluation(
+matchAgg.inputExprs,
+transformFuncName)
+
+  generateAggCalculation(aggFunc, transformFuncName, inputTransform)
+}
+
+private case class LogicalMatchAggCall(
+  function: SqlAggFunction,
+  inputTypes: Seq[RelDataType],
+  exprIndices: Seq[Int]
+)
+
+private case class MatchAggCall(
+  aggFunction: TableAggregateFunction[_, _],
+  inputIndices: Array[Int],
+  dataViews: Seq[DataViewSpec[_]]
+)
+
+private case class MatchAgg(
+  aggregations: Seq[MatchAggCall],
+  inputExprs: Seq[RexNode]
+)
+
+private def extractAggregatesAndExpressions: MatchAgg = {
+  val inputRows = new mutable.LinkedHashMap[String, (RexNode, Int)]
+
+  val logicalAggregates = 

[GitHub] twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237398634
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
 ##
 @@ -31,30 +31,27 @@ import org.apache.flink.types.Row
 class IterativeConditionRunner(
 name: String,
 code: String)
-  extends IterativeCondition[Row]
-  with Compiler[IterativeCondition[Row]]
+  extends RichIterativeCondition[Row]
+  with Compiler[RichIterativeCondition[Row]]
   with Logging {
 
-  @transient private var function: IterativeCondition[Row] = _
+  @transient private var function: RichIterativeCondition[Row] = _
 
-  def init(): Unit = {
+  override def open(parameters: Configuration): Unit = {
 LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
-// We cannot get user's classloader currently, see FLINK-6938 for details
-val clazz = compile(Thread.currentThread().getContextClassLoader, name, 
code)
+val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
 LOG.debug("Instantiating IterativeCondition.")
 function = clazz.newInstance()
-// TODO add logic for opening and closing the function once it can be a 
RichFunction
+FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+FunctionUtils.openFunction(function, parameters)
   }
 
   override def filter(value: Row, ctx: IterativeCondition.Context[Row]): 
Boolean = {
 function.filter(value, ctx)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-in.defaultReadObject()
-if (function == null) {
-  init()
-}
+  override def close(): Unit = {
+super.close()
 
 Review comment:
   Remove this line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237399713
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
 ##
 @@ -31,30 +31,27 @@ import org.apache.flink.types.Row
 class IterativeConditionRunner(
 name: String,
 code: String)
-  extends IterativeCondition[Row]
-  with Compiler[IterativeCondition[Row]]
+  extends RichIterativeCondition[Row]
+  with Compiler[RichIterativeCondition[Row]]
   with Logging {
 
-  @transient private var function: IterativeCondition[Row] = _
+  @transient private var function: RichIterativeCondition[Row] = _
 
-  def init(): Unit = {
+  override def open(parameters: Configuration): Unit = {
 LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
-// We cannot get user's classloader currently, see FLINK-6938 for details
-val clazz = compile(Thread.currentThread().getContextClassLoader, name, 
code)
+val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
 LOG.debug("Instantiating IterativeCondition.")
 function = clazz.newInstance()
-// TODO add logic for opening and closing the function once it can be a 
RichFunction
+FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+FunctionUtils.openFunction(function, parameters)
   }
 
   override def filter(value: Row, ctx: IterativeCondition.Context[Row]): 
Boolean = {
 function.filter(value, ctx)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-in.defaultReadObject()
-if (function == null) {
-  init()
-}
+  override def close(): Unit = {
+super.close()
+function.close()
 
 Review comment:
   Use `org.apache.flink.api.common.functions.util.FunctionUtils#closeFunction` 
instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237400562
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 
 Review comment:
   I guess this problem has not been solved with the new rich function?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237398248
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
 ##
 @@ -31,30 +31,27 @@ import org.apache.flink.types.Row
 class IterativeConditionRunner(
 name: String,
 code: String)
-  extends IterativeCondition[Row]
-  with Compiler[IterativeCondition[Row]]
+  extends RichIterativeCondition[Row]
+  with Compiler[RichIterativeCondition[Row]]
   with Logging {
 
-  @transient private var function: IterativeCondition[Row] = _
+  @transient private var function: RichIterativeCondition[Row] = _
 
-  def init(): Unit = {
+  override def open(parameters: Configuration): Unit = {
 LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
-// We cannot get user's classloader currently, see FLINK-6938 for details
-val clazz = compile(Thread.currentThread().getContextClassLoader, name, 
code)
+val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
 LOG.debug("Instantiating IterativeCondition.")
 function = clazz.newInstance()
-// TODO add logic for opening and closing the function once it can be a 
RichFunction
+FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
 
 Review comment:
   Is there a reason why a distributed cache is not supported in a 
`CepRuntimeContext`? We don't expose many context properties in SQL UDFs but 
the distributed cache apparently seems to be important.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237399688
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -60,17 +67,9 @@ class PatternSelectFunctionRunner(
 out.collect(outCRow)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-in.defaultReadObject()
-
-if (outCRow == null) {
-  outCRow = new CRow(null, true)
-}
-
-if (function == null) {
-  init()
-}
+  override def close(): Unit = {
+super.close()
+function.close()
 
 Review comment:
   Use `org.apache.flink.api.common.functions.util.FunctionUtils#closeFunction` 
instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10999) Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast to java.lang.Long

2018-11-29 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702890#comment-16702890
 ] 

Fabian Hueske commented on FLINK-10999:
---

Hi,

Processing-time and event-time are supported in the same table.
You have a type mismatch some where in your code. It's not easy to tell where 
exactly, because your example is quite complex.
I'd assume you try call .rowtime on a Timestamp attribute (should be a Long 
attribute).

> Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast 
> to java.lang.Long
> ---
>
> Key: FLINK-10999
> URL: https://issues.apache.org/jira/browse/FLINK-10999
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.5, 1.6.2
>Reporter: ideal-hp
>Priority: Major
>
> Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
> java.lang.Long
>  at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>  at DataStreamCalcRule$15.processElement(Unknown Source)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu opened a new pull request #7196: [FLINK-10974] [table] Add support for flatMap to table API

2018-11-29 Thread GitBox
dianfu opened a new pull request #7196: [FLINK-10974] [table] Add support for 
flatMap to table API
URL: https://github.com/apache/flink/pull/7196
 
 
   ## What is the purpose of the change
   
   *This pull request adds the flatMap operation to the table API.*
   
   ## Brief change log
   
 - *Move the implicit tableFunctionCall2Table from package.scala to 
expressionDsl.scala*
 - *Add implicit in expressionDsl.scala which can convert a TableFunction 
to an Expression*
 - *Add flatMap API in table.scala*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added 
FlatMapTest/FlatMapITCase/FlatMapValidationTest/FlatMapStringExpressionTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11022) Update LICENSE and NOTICE files for older releases

2018-11-29 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-11022:


Assignee: Chesnay Schepler  (was: vinoyang)

> Update LICENSE and NOTICE files for older releases
> --
>
> Key: FLINK-11022
> URL: https://issues.apache.org/jira/browse/FLINK-11022
> Project: Flink
>  Issue Type: Bug
>Affects Versions: shaded-5.0, 1.5.5, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.6, 1.6.3, shaded-6.0
>
>
> This is a follow up task from FLINK-10987 to also update the {{LICENSE}} and 
> {{NOTICE}} files for the affected versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11023) Update LICENSE and NOTICE files for flink-connectors

2018-11-29 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-11023:


Assignee: Chesnay Schepler  (was: vinoyang)

> Update LICENSE and NOTICE files for flink-connectors
> 
>
> Key: FLINK-11023
> URL: https://issues.apache.org/jira/browse/FLINK-11023
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.6, 1.6.3, 1.7.1
>
>
> Similar to FLINK-10987 we should also update the {{LICENSE}} and {{NOTICE}} 
> files for {{flink-connectors}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10999) Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast to java.lang.Long

2018-11-29 Thread xuqianjin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702877#comment-16702877
 ] 

xuqianjin commented on FLINK-10999:
---

[~harbby] You see in the execution plan that you can't add rowtime and proctime 
in turn.I'm not sure if this needs to be changed.

> Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast 
> to java.lang.Long
> ---
>
> Key: FLINK-10999
> URL: https://issues.apache.org/jira/browse/FLINK-10999
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.5, 1.6.2
>Reporter: ideal-hp
>Priority: Major
>
> Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
> java.lang.Long
>  at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>  at DataStreamCalcRule$15.processElement(Unknown Source)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] maqingxiang commented on issue #7197: [hotfix][flink-runtime] add try catch when find a unique file name for the spilling channel

2018-11-29 Thread GitBox
maqingxiang commented on issue #7197: [hotfix][flink-runtime] add try catch 
when find a unique file name for the spilling channel
URL: https://github.com/apache/flink/pull/7197#issuecomment-442760936
 
 
   @zhijiangW
   
   Please review it, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237402642
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 // accessed from System.currentTimeMillis(), so there is no graceful way 
to assert the proctime
   }
+
+  @Test
+  def testRichUdfs(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
+StreamITCase.clear
+
+val data = new mutable.MutableList[(Int, String, Long)]
+data.+=((1, "a", 1))
+data.+=((2, "a", 1))
+data.+=((3, "a", 1))
+data.+=((4, "a", 1))
+data.+=((5, "a", 1))
+data.+=((6, "b", 1))
+data.+=((7, "a", 1))
+data.+=((8, "a", 1))
+data.+=((9, "f", 1))
+
+val t = env.fromCollection(data)
+  .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+tEnv.registerFunction("prefix", new RichScalarFunc)
+val prefix = "PREF"
+UserDefinedFunctionTestUtils
+  .setJobParameters(env, Map("prefix" -> prefix))
+
+val sqlQuery =
+  s"""
+ |SELECT *
+ |FROM MyTable
+ |MATCH_RECOGNIZE (
+ |  ORDER BY proctime
+ |  MEASURES
+ |FIRST(id) as firstId,
+ |prefix(A.name) as prefixedNameA,
+ |LAST(id) as lastId
+ |  AFTER MATCH SKIP PAST LAST ROW
+ |  PATTERN (A+ C)
+ |  DEFINE
+ |A AS prefix(A.name) = '$prefix:a'
+ |) AS T
+ |""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList("1,PREF:a,6", "7,PREF:a,9")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 class ToMillis extends ScalarFunction {
   def eval(t: Timestamp): Long = {
 t.toInstant.toEpochMilli + 
TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli)
   }
 }
+
+private class RichScalarFunc extends ScalarFunction {
+
+  private var prefix = "ERROR_VALUE"
+
+  override def open(context: FunctionContext): Unit = {
+prefix = context.getJobParameter("prefix", "")
+  }
+
+  def eval(value: String): String = {
+s"$prefix:$value"
+  }
+
+  override def close(): Unit = {
+prefix = "ERROR_VALUE"
 
 Review comment:
   This line has no meaning so far. Remove the entire method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237399504
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -60,17 +67,9 @@ class PatternSelectFunctionRunner(
 out.collect(outCRow)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-in.defaultReadObject()
-
-if (outCRow == null) {
-  outCRow = new CRow(null, true)
-}
-
-if (function == null) {
-  init()
-}
+  override def close(): Unit = {
+super.close()
 
 Review comment:
   Remove this line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237402448
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 // accessed from System.currentTimeMillis(), so there is no graceful way 
to assert the proctime
   }
+
+  @Test
+  def testRichUdfs(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
+StreamITCase.clear
+
+val data = new mutable.MutableList[(Int, String, Long)]
+data.+=((1, "a", 1))
+data.+=((2, "a", 1))
+data.+=((3, "a", 1))
+data.+=((4, "a", 1))
+data.+=((5, "a", 1))
+data.+=((6, "b", 1))
+data.+=((7, "a", 1))
+data.+=((8, "a", 1))
+data.+=((9, "f", 1))
+
+val t = env.fromCollection(data)
+  .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+tEnv.registerFunction("prefix", new RichScalarFunc)
+val prefix = "PREF"
+UserDefinedFunctionTestUtils
+  .setJobParameters(env, Map("prefix" -> prefix))
+
+val sqlQuery =
+  s"""
+ |SELECT *
+ |FROM MyTable
+ |MATCH_RECOGNIZE (
+ |  ORDER BY proctime
+ |  MEASURES
+ |FIRST(id) as firstId,
+ |prefix(A.name) as prefixedNameA,
+ |LAST(id) as lastId
+ |  AFTER MATCH SKIP PAST LAST ROW
+ |  PATTERN (A+ C)
+ |  DEFINE
+ |A AS prefix(A.name) = '$prefix:a'
+ |) AS T
+ |""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList("1,PREF:a,6", "7,PREF:a,9")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 class ToMillis extends ScalarFunction {
   def eval(t: Timestamp): Long = {
 t.toInstant.toEpochMilli + 
TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli)
   }
 }
+
+private class RichScalarFunc extends ScalarFunction {
 
 Review comment:
   Give more meaningful name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237401057
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 // accessed from System.currentTimeMillis(), so there is no graceful way 
to assert the proctime
   }
+
+  @Test
+  def testRichUdfs(): Unit = {
 
 Review comment:
   nit: There are no `RichUdfs` just `UserDefinedFunctions` ;-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10974) Add FlatMap to TableAPI

2018-11-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10974:
---
Labels: pull-request-available  (was: )

> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237396997
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -131,47 +132,39 @@ class MatchCodeGenerator(
 : GeneratedFunction[F, T] = {
 val funcName = newName(name)
 val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName
-val (functionClass, signature, inputStatements, isInterface) =
-  if (clazz == classOf[IterativeCondition[_]]) {
-val baseClass = classOf[IterativeCondition[_]]
+val (functionClass, signature, inputStatements) =
+  if (clazz == classOf[IterativeCondition[_]] || clazz == 
classOf[RichIterativeCondition[_]]) {
 
 Review comment:
   Is there a reason to support both interface and rich class? I would keep it 
simple here and only support the rich variant.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10974) Add FlatMap to TableAPI

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702891#comment-16702891
 ] 

ASF GitHub Bot commented on FLINK-10974:


dianfu opened a new pull request #7196: [FLINK-10974] [table] Add support for 
flatMap to table API
URL: https://github.com/apache/flink/pull/7196
 
 
   ## What is the purpose of the change
   
   *This pull request adds the flatMap operation to the table API.*
   
   ## Brief change log
   
 - *Move the implicit tableFunctionCall2Table from package.scala to 
expressionDsl.scala*
 - *Add implicit in expressionDsl.scala which can convert a TableFunction 
to an Expression*
 - *Add flatMap API in table.scala*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added 
FlatMapTest/FlatMapITCase/FlatMapValidationTest/FlatMapStringExpressionTest*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add FlatMap to TableAPI
> ---
>
> Key: FLINK-10974
> URL: https://issues.apache.org/jira/browse/FLINK-10974
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Add FlatMap operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>.flatMap(fun: TableFunction)  // output has columns 'a, 'b, 'c
>.select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702900#comment-16702900
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237402642
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 // accessed from System.currentTimeMillis(), so there is no graceful way 
to assert the proctime
   }
+
+  @Test
+  def testRichUdfs(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+tEnv.getConfig.setMaxGeneratedCodeLength(1)
+StreamITCase.clear
+
+val data = new mutable.MutableList[(Int, String, Long)]
+data.+=((1, "a", 1))
+data.+=((2, "a", 1))
+data.+=((3, "a", 1))
+data.+=((4, "a", 1))
+data.+=((5, "a", 1))
+data.+=((6, "b", 1))
+data.+=((7, "a", 1))
+data.+=((8, "a", 1))
+data.+=((9, "f", 1))
+
+val t = env.fromCollection(data)
+  .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+tEnv.registerFunction("prefix", new RichScalarFunc)
+val prefix = "PREF"
+UserDefinedFunctionTestUtils
+  .setJobParameters(env, Map("prefix" -> prefix))
+
+val sqlQuery =
+  s"""
+ |SELECT *
+ |FROM MyTable
+ |MATCH_RECOGNIZE (
+ |  ORDER BY proctime
+ |  MEASURES
+ |FIRST(id) as firstId,
+ |prefix(A.name) as prefixedNameA,
+ |LAST(id) as lastId
+ |  AFTER MATCH SKIP PAST LAST ROW
+ |  PATTERN (A+ C)
+ |  DEFINE
+ |A AS prefix(A.name) = '$prefix:a'
+ |) AS T
+ |""".stripMargin
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList("1,PREF:a,6", "7,PREF:a,9")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 class ToMillis extends ScalarFunction {
   def eval(t: Timestamp): Long = {
 t.toInstant.toEpochMilli + 
TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli)
   }
 }
+
+private class RichScalarFunc extends ScalarFunction {
+
+  private var prefix = "ERROR_VALUE"
+
+  override def open(context: FunctionContext): Unit = {
+prefix = context.getJobParameter("prefix", "")
+  }
+
+  def eval(value: String): String = {
+s"$prefix:$value"
+  }
+
+  override def close(): Unit = {
+prefix = "ERROR_VALUE"
 
 Review comment:
   This line has no meaning so far. Remove the entire method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702893#comment-16702893
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237398634
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
 ##
 @@ -31,30 +31,27 @@ import org.apache.flink.types.Row
 class IterativeConditionRunner(
 name: String,
 code: String)
-  extends IterativeCondition[Row]
-  with Compiler[IterativeCondition[Row]]
+  extends RichIterativeCondition[Row]
+  with Compiler[RichIterativeCondition[Row]]
   with Logging {
 
-  @transient private var function: IterativeCondition[Row] = _
+  @transient private var function: RichIterativeCondition[Row] = _
 
-  def init(): Unit = {
+  override def open(parameters: Configuration): Unit = {
 LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
-// We cannot get user's classloader currently, see FLINK-6938 for details
-val clazz = compile(Thread.currentThread().getContextClassLoader, name, 
code)
+val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
 LOG.debug("Instantiating IterativeCondition.")
 function = clazz.newInstance()
-// TODO add logic for opening and closing the function once it can be a 
RichFunction
+FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+FunctionUtils.openFunction(function, parameters)
   }
 
   override def filter(value: Row, ctx: IterativeCondition.Context[Row]): 
Boolean = {
 function.filter(value, ctx)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-in.defaultReadObject()
-if (function == null) {
-  init()
-}
+  override def close(): Unit = {
+super.close()
 
 Review comment:
   Remove this line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702895#comment-16702895
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237398248
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
 ##
 @@ -31,30 +31,27 @@ import org.apache.flink.types.Row
 class IterativeConditionRunner(
 name: String,
 code: String)
-  extends IterativeCondition[Row]
-  with Compiler[IterativeCondition[Row]]
+  extends RichIterativeCondition[Row]
+  with Compiler[RichIterativeCondition[Row]]
   with Logging {
 
-  @transient private var function: IterativeCondition[Row] = _
+  @transient private var function: RichIterativeCondition[Row] = _
 
-  def init(): Unit = {
+  override def open(parameters: Configuration): Unit = {
 LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
-// We cannot get user's classloader currently, see FLINK-6938 for details
-val clazz = compile(Thread.currentThread().getContextClassLoader, name, 
code)
+val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
 LOG.debug("Instantiating IterativeCondition.")
 function = clazz.newInstance()
-// TODO add logic for opening and closing the function once it can be a 
RichFunction
+FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
 
 Review comment:
   Is there a reason why a distributed cache is not supported in a 
`CepRuntimeContext`? We don't expose many context properties in SQL UDFs but 
the distributed cache apparently seems to be important.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702899#comment-16702899
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237401057
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 // accessed from System.currentTimeMillis(), so there is no graceful way 
to assert the proctime
   }
+
+  @Test
+  def testRichUdfs(): Unit = {
 
 Review comment:
   nit: There are no `RichUdfs` just `UserDefinedFunctions` ;-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702896#comment-16702896
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237399713
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/IterativeConditionRunner.scala
 ##
 @@ -31,30 +31,27 @@ import org.apache.flink.types.Row
 class IterativeConditionRunner(
 name: String,
 code: String)
-  extends IterativeCondition[Row]
-  with Compiler[IterativeCondition[Row]]
+  extends RichIterativeCondition[Row]
+  with Compiler[RichIterativeCondition[Row]]
   with Logging {
 
-  @transient private var function: IterativeCondition[Row] = _
+  @transient private var function: RichIterativeCondition[Row] = _
 
-  def init(): Unit = {
+  override def open(parameters: Configuration): Unit = {
 LOG.debug(s"Compiling IterativeCondition: $name \n\n Code:\n$code")
-// We cannot get user's classloader currently, see FLINK-6938 for details
-val clazz = compile(Thread.currentThread().getContextClassLoader, name, 
code)
+val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
 LOG.debug("Instantiating IterativeCondition.")
 function = clazz.newInstance()
-// TODO add logic for opening and closing the function once it can be a 
RichFunction
+FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+FunctionUtils.openFunction(function, parameters)
   }
 
   override def filter(value: Row, ctx: IterativeCondition.Context[Row]): 
Boolean = {
 function.filter(value, ctx)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-in.defaultReadObject()
-if (function == null) {
-  init()
-}
+  override def close(): Unit = {
+super.close()
+function.close()
 
 Review comment:
   Use `org.apache.flink.api.common.functions.util.FunctionUtils#closeFunction` 
instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702892#comment-16702892
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237396997
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -131,47 +132,39 @@ class MatchCodeGenerator(
 : GeneratedFunction[F, T] = {
 val funcName = newName(name)
 val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName
-val (functionClass, signature, inputStatements, isInterface) =
-  if (clazz == classOf[IterativeCondition[_]]) {
-val baseClass = classOf[IterativeCondition[_]]
+val (functionClass, signature, inputStatements) =
+  if (clazz == classOf[IterativeCondition[_]] || clazz == 
classOf[RichIterativeCondition[_]]) {
 
 Review comment:
   Is there a reason to support both interface and rich class? I would keep it 
simple here and only support the rich variant.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702898#comment-16702898
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237400562
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 
 Review comment:
   I guess this problem has not been solved with the new rich function?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702897#comment-16702897
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237399504
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -60,17 +67,9 @@ class PatternSelectFunctionRunner(
 out.collect(outCRow)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-in.defaultReadObject()
-
-if (outCRow == null) {
-  outCRow = new CRow(null, true)
-}
-
-if (function == null) {
-  init()
-}
+  override def close(): Unit = {
+super.close()
 
 Review comment:
   Remove this line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702901#comment-16702901
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237399688
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/PatternSelectFunctionRunner.scala
 ##
 @@ -60,17 +67,9 @@ class PatternSelectFunctionRunner(
 out.collect(outCRow)
   }
 
-  @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
-in.defaultReadObject()
-
-if (outCRow == null) {
-  outCRow = new CRow(null, true)
-}
-
-if (function == null) {
-  init()
-}
+  override def close(): Unit = {
+super.close()
+function.close()
 
 Review comment:
   Use `org.apache.flink.api.common.functions.util.FunctionUtils#closeFunction` 
instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] maqingxiang opened a new pull request #7197: [hotfix][flink-runtime] add try catch when find a unique file name for the spilling channel

2018-11-29 Thread GitBox
maqingxiang opened a new pull request #7197: [hotfix][flink-runtime] add try 
catch when find a unique file name for the spilling channel
URL: https://github.com/apache/flink/pull/7197
 
 
   ## What is the purpose of the change
   
   Catch exceptions thrown due to disk loss, try to find a unique file name for 
the spilling channel again.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704290#comment-16704290
 ] 

ASF GitHub Bot commented on FLINK-10712:


Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when 
using RestartPipelinedRegionStrategy
URL: https://github.com/apache/flink/pull/7009#issuecomment-443096765
 
 
   @StefanRRichter Thanks for your comments, I would refactor this PR.
   BTW, I found region failover without letting checkpoint coordinator restart 
its `checkpointScheduler` would not guarantee `EXACTLY_ONCE` mechanism. I'll 
include this part of modification in next commits.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RestartPipelinedRegionStrategy does not restore state
> -
>
> Key: FLINK-10712
> URL: https://issues.apache.org/jira/browse/FLINK-10712
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Assignee: Yun Tang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> RestartPipelinedRegionStrategy does not perform any state restore. This is 
> big problem because all restored regions will be restarted with empty state. 
> We need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when using RestartPipelinedRegionStrategy

2018-11-29 Thread GitBox
Myasuka commented on issue #7009: [FLINK-10712] Support to restore state when 
using RestartPipelinedRegionStrategy
URL: https://github.com/apache/flink/pull/7009#issuecomment-443096765
 
 
   @StefanRRichter Thanks for your comments, I would refactor this PR.
   BTW, I found region failover without letting checkpoint coordinator restart 
its `checkpointScheduler` would not guarantee `EXACTLY_ONCE` mechanism. I'll 
include this part of modification in next commits.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10930) Refactor checkpoint directory layout

2018-11-29 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704286#comment-16704286
 ] 

Yun Tang commented on FLINK-10930:
--

[~StephanEwen] I think your opinion to refactor the mechanism of 
{{CheckpointExceptionHandler}} sounds more reasonable, we should always decline 
checkpoint on task side instead of just failing this task. Maybe we should 
retake FLINK-4810 to enable checkpoint coordinator fail after "n" unsuccessful 
checkpoints.

There exists some details on this problem:
 # Whether we should also fail the whole execution graph after "n" *expired* 
checkpoints.
 # If checkpoint coordinator receive late expire checkpoint message, it should 
just delete file state handle's parent folder (that is the {{chk-i}} folder) to 
avoid those untouched  {{chk-i}} folders left.

> Refactor checkpoint directory layout
> 
>
> Key: FLINK-10930
> URL: https://issues.apache.org/jira/browse/FLINK-10930
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.8.0
>
>
> The current checkpoint directory layout is introduced from FLINK-8531 with 
> three different scopes for tasks:
>  * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data 
> and operator state files.
>  * *SHARED* is for state that is possibly part of multiple checkpoints
>  * *TASKOWNED* is for state that must never by dropped by the jobManager.
> {code:java}
> /user-defined-dir/{job-id}
>   |
>   +-- shared/
>   +-- taskowned/
>   +-- chk-1/  // metadata and operator-state files
>   +-- chk-2/
> ...{code}
> If we just retain one complete checkpoint, the expected exclusive directory, 
> which is the {{chk-id}} checkpoint directory, should only be one left. 
> However, as FLINK-10855 interpreted, the failed/expired checkpoint 
> directories would also be left. This is really confusing for users who [uses 
> externalized checkpoint to resume 
> job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint],
>  not to mention the checkpoint directory resource leak. 
>  As far as I could know, if the {{chk-id}} checkpoint directory still 
> contains the operator state files, I have no idea how to clean the useless 
> {{chk-id}} checkpoint directory gracefully. Once job manager dispose the 
> failed/expired checkpoint, the target {{chk-id}} checkpoint directory would 
> be deleted by JM. However, this directory would also be create by tasks who 
> having not reported to JM. When {{checkpoint coordinator}} received those 
> late expired tasks, it would discard those useless handles. However, if JM 
> also plans to delete the empty parent folder, which is already unsupported 
> after FLINK-8540, another task uploading operator state files would meet 
> exception due to its writing target's parent directory has just been removed. 
> Currently, we handle task checkpoint failure as task failure and the whole 
> job would failover which is not we want.
> From what I see, I plan to separate *EXCLUSIVE* directory into two kind of 
> exclusive directories, one is still several {{chk-id}} checkpoint directories 
> but only contains its exclusive {{meta data}}, the other is just one 
> directory named {{exclusive}} which containing the operator state files. 
> Operator state files are exclusive to just one specified checkpoint, we could 
> also add {{checkpoint-id}} within their file name to let users easily clean 
> up.
>  The refactored directory layout should be :
> {code:java}
> /user-defined-dir/{job-id}
> |
>   +-- shared/
>   +-- taskowned/
>   +-- exclusive/// operator state files
>   +-- chk-1/// metadata
> +-- chk-2/
> ...{code}
>  
> This new directory layout would not affect users who use external checkpoint 
> to resume jobs, since they still just give 
> {{/user-defined-dir/job-id/chk-id}} path to resume job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #7026: [FLINK-10798] Add the version number of Flink 1.7 to MigrationVersion

2018-11-29 Thread GitBox
yanghua commented on issue #7026: [FLINK-10798] Add the version number of Flink 
1.7 to MigrationVersion
URL: https://github.com/apache/flink/pull/7026#issuecomment-443107105
 
 
   Since Flink 1.7 has released, shall we merge this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #7182: [FLINK-11015] Remove deprecated methods and classes about table from all the Kafka connectors

2018-11-29 Thread GitBox
yanghua commented on issue #7182: [FLINK-11015] Remove deprecated methods and 
classes about table from all the Kafka connectors
URL: https://github.com/apache/flink/pull/7182#issuecomment-443107227
 
 
   @twalthr Let's merge this PR? thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10798) Add the version number of Flink 1.7 to MigrationVersion

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704324#comment-16704324
 ] 

ASF GitHub Bot commented on FLINK-10798:


yanghua commented on issue #7026: [FLINK-10798] Add the version number of Flink 
1.7 to MigrationVersion
URL: https://github.com/apache/flink/pull/7026#issuecomment-443107105
 
 
   Since Flink 1.7 has released, shall we merge this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add the version number of Flink 1.7 to MigrationVersion
> ---
>
> Key: FLINK-10798
> URL: https://issues.apache.org/jira/browse/FLINK-10798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704326#comment-16704326
 ] 

ASF GitHub Bot commented on FLINK-10252:


yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850#issuecomment-443107471
 
 
   @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges

2018-11-29 Thread GitBox
yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850#issuecomment-443107471
 
 
   @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11015) Remove deprecated methods and classes about table from all the Kafka connectors

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704325#comment-16704325
 ] 

ASF GitHub Bot commented on FLINK-11015:


yanghua commented on issue #7182: [FLINK-11015] Remove deprecated methods and 
classes about table from all the Kafka connectors
URL: https://github.com/apache/flink/pull/7182#issuecomment-443107227
 
 
   @twalthr Let's merge this PR? thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove deprecated methods and classes about table from all the Kafka 
> connectors
> ---
>
> Key: FLINK-11015
> URL: https://issues.apache.org/jira/browse/FLINK-11015
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> This is to make the "Disentangle flink-connector-kafka from flink-table and 
> flink-json" (FLINK-9461) plan more smoothly. For more details, please refer 
> to the discussion of FLINK-9461.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11017) Time interval for window aggregations in SQL is wrongly translated if specified with YEAR_MONTH resolution

2018-11-29 Thread xuqianjin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704307#comment-16704307
 ] 

xuqianjin commented on FLINK-11017:
---

[~dawidwys] I want to try to solve it.

> Time interval for window aggregations in SQL is wrongly translated if 
> specified with YEAR_MONTH resolution
> --
>
> Key: FLINK-11017
> URL: https://issues.apache.org/jira/browse/FLINK-11017
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.6.3, 1.8.0, 1.7.1
>
>
> If a time interval was specified with {{YEAR TO MONTH}} resolution like e.g.:
> {code}
> SELECT * 
> FROM Mytable
> GROUP BY 
> TUMBLE(rowtime, INTERVAL '1-2' YEAR TO MONTH)
> {code}
> it will be wrongly translated to 14 milliseconds window. We should allow for 
> only DAY TO SECOND resolution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11035) Notify data available to network stack immediately after finishing BufferBuilder

2018-11-29 Thread zhijiang (JIRA)
zhijiang created FLINK-11035:


 Summary: Notify data available to network stack immediately after 
finishing BufferBuilder
 Key: FLINK-11035
 URL: https://issues.apache.org/jira/browse/FLINK-11035
 Project: Flink
  Issue Type: Sub-task
  Components: Network
Affects Versions: 1.8.0
Reporter: zhijiang


The data availability notification for network relies on whether there are 
finished _BufferBuilder_ or flush triggered. If flush is not triggered and the 
first _BufferBuilder_ enqueues into the subpartition, although this 
_BufferBuilder_ is finished on _RecordWriter_ side, it has to rely on enqueuing 
the second _BufferBuilder_ to trigger notification available.  It may bring 
some delays for transporting the finished _BufferBuilder_ in network, 
especially there has a blocking operation for requesting the second 
_BufferBuilder_ from pool.

Supposing there is only one available buffer in LocalBufferPool in extreme 
scenarios, if the first _BufferBuilder_ is not transported and recycled, the 
requesting for second _BufferBuilder_ will be blocked all the time.

I propose to add a _notifyBufferFinished_ method in _ResultPartitionWriter_ 
interface, then _RecordWriter_ can notify via it after _BufferBuilder_ 
finished_._

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on issue #7197: [hotfix][flink-runtime] add try catch when find a unique file name for the spilling channel

2018-11-29 Thread GitBox
zhijiangW commented on issue #7197: [hotfix][flink-runtime] add try catch when 
find a unique file name for the spilling channel
URL: https://github.com/apache/flink/pull/7197#issuecomment-443095850
 
 
   lgtm!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704330#comment-16704330
 ] 

ASF GitHub Bot commented on FLINK-10431:


Clark commented on issue #6898: [FLINK-10431] Extraction of 
scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#issuecomment-443109202
 
 
   Hi @StefanRRichter , I am just wondering why make SlotPool no longer to be 
an RpcEndpoint?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Clarkkkkk commented on issue #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler

2018-11-29 Thread GitBox
Clark commented on issue #6898: [FLINK-10431] Extraction of 
scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#issuecomment-443109202
 
 
   Hi @StefanRRichter , I am just wondering why make SlotPool no longer to be 
an RpcEndpoint?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10993) Bring bloomfilter as a public API

2018-11-29 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704340#comment-16704340
 ] 

vinoyang commented on FLINK-10993:
--

[~fhueske] yes, I looked the design document attached by FLINK-8601. Totally, 
it looks good to me. So I agree with you. Let's move the discussion over to 
FLINK-8601.

> Bring bloomfilter as a public API
> -
>
> Key: FLINK-10993
> URL: https://issues.apache.org/jira/browse/FLINK-10993
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> Flink internally provides an implementation of BloomFilter, but only for 
> internal optimization, and does not provide APIs for public access.
> Here is a user mail discussion before : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Bloom-filter-in-Flink-td10608.html
> Considering that many users have the need to "determine duplicates" in 
> streaming computing, I think it would make sense to provide such an API.
> In addition, Spark has provided BloomFilter as a public API : 
> {code:java}
> val bf = df.stat.bloomFilter("dd",dataLen,0.01)
> val rightNum = rdd.map(x=>(x.toInt,bf.mightContainString(x)))
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11034) Provide "rewriting config” to file system factory

2018-11-29 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704369#comment-16704369
 ] 

Tony Xintong Song commented on FLINK-11034:
---

I agree with you that we should allow users to config multiple jobs in the same 
session differently. However, there are also config options that should remain 
consistent during the lifecycle of the whole session. I think the first step 
towards this job-specific config issue is probably separate config options into 
two categories: job-level configs which should be configured for each job 
independently, and session-level configs which should remain consistent. And 
probably we should have two config files for the two categories. Otherwise, it 
could be troublesome for the user to figure out which config options can be 
"rewritten" when submitting a new job to a exist session and which can not.

> Provide "rewriting config” to file system factory 
> --
>
> Key: FLINK-11034
> URL: https://issues.apache.org/jira/browse/FLINK-11034
> Project: Flink
>  Issue Type: Improvement
>Reporter: Wei-Che Wei
>Priority: Major
>
> In the discussion in this mailing thread 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/how-to-override-s3-key-config-in-flink-job-td24606.html],
>  it showed that it is not able to overwrite config in file system factory 
> when submit a flink job.
> That means we will share the same config for multiple jobs in a session 
> cluster. Or user can't use different configuration for checkpointing and file 
> sink. For example, user might have different s3 buckets for checkpointing and 
> file sink, but each of the s3 bucket might have different s3 access key for 
> some management concerns.
> We might need to provide a way to overwrite configuration when calling file 
> system factory "get" method, and let those user facing components, like 
> checkpointing or file sink, will be able to get overwriting config from user 
> and create a file system with those changes in the new config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8739) Optimize runtime support for distinct filter

2018-11-29 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704356#comment-16704356
 ] 

Dian Fu commented on FLINK-8739:


Hi [~walterddr], are you still working on this ticket? I'd like to help on this 
ticket if you haven't started the work.  Thanks a lot.

> Optimize runtime support for distinct filter
> 
>
> Key: FLINK-8739
> URL: https://issues.apache.org/jira/browse/FLINK-8739
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> Possible optimizaitons:
> 1. Decouple distinct map and actual accumulator so that they can separately 
> be created in codegen.
> 2. Reuse same distinct accumulator for filtering, e.g. `SELECT 
> COUNT(DISTINCT(a)), SUM(DISTINCT(a))` should reuse the same distinct map.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10993) Bring bloomfilter as a public API

2018-11-29 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702930#comment-16702930
 ] 

Fabian Hueske commented on FLINK-10993:
---

I like the proposal of FLINK-8601. It seems like a very useful addition. 

[~yanghua], does FLINK-8601 provide all the features that you were looking for 
with this Jira?
If yes (or mostly yes), can we close this issue and move the discussion over to 
FLINK-8601?


> Bring bloomfilter as a public API
> -
>
> Key: FLINK-10993
> URL: https://issues.apache.org/jira/browse/FLINK-10993
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> Flink internally provides an implementation of BloomFilter, but only for 
> internal optimization, and does not provide APIs for public access.
> Here is a user mail discussion before : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Bloom-filter-in-Flink-td10608.html
> Considering that many users have the need to "determine duplicates" in 
> streaming computing, I think it would make sense to provide such an API.
> In addition, Spark has provided BloomFilter as a public API : 
> {code:java}
> val bf = df.stat.bloomFilter("dd",dataLen,0.01)
> val rightNum = rdd.map(x=>(x.toInt,bf.mightContainString(x)))
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11025) Connector shading is inconsistent

2018-11-29 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702929#comment-16702929
 ] 

Chesnay Schepler commented on FLINK-11025:
--

Additionally there are some modules where we additionally create a fat 
`sql-jar`, which just adds to this mess.

> Connector shading is inconsistent
> -
>
> Key: FLINK-11025
> URL: https://issues.apache.org/jira/browse/FLINK-11025
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Build System, 
> Streaming Connectors
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> I've had a look at our connectors and our shading practices very 
> inconsistent. Connectors either:
>  # don't shade anything at all
>  # shade dependencies that are prone to causing conflicts (like guava)
>  # shade everything
> Examples:
>  # nifi, hbase, jdbc, orc, ES6, rabbitmq
>  # Cassandra, Kafka 0.8
>  # Kafka 0.9+, ES1,2,5, Kinesis, twitter
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-11-29 Thread GitBox
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237422016
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1169,6 +1169,94 @@ object AggregateUtil {
 (propPos._1, propPos._2, propPos._3)
   }
 
+  private[flink] def transformToAggregateFunction(
+  aggregateCall: SqlAggFunction,
+  isDistinct: Boolean,
+  aggregateInputType: Seq[RelDataType],
+  needRetraction: Boolean,
+  tableConfig: TableConfig,
+  isStateBackedDataViews: Boolean = false,
 
 Review comment:
   Remove the default here and at other places. Implementers should explicitly 
state what they want.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702971#comment-16702971
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237422674
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1169,6 +1169,94 @@ object AggregateUtil {
 (propPos._1, propPos._2, propPos._3)
   }
 
+  private[flink] def transformToAggregateFunction(
+  aggregateCall: SqlAggFunction,
+  isDistinct: Boolean,
+  aggregateInputType: Seq[RelDataType],
+  needRetraction: Boolean,
+  tableConfig: TableConfig,
+  isStateBackedDataViews: Boolean = false,
+  index: Int = 0)
 
 Review comment:
   Add more documentation to this function? What is `index` good for?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702967#comment-16702967
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237420010
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1185,331 +1273,281 @@ object AggregateUtil {
 val aggFieldIndexes = new Array[Array[Int]](aggregateCalls.size)
 val aggregates = new Array[TableAggregateFunction[_ <: Any, _ <: 
Any]](aggregateCalls.size)
 val accTypes = new Array[TypeInformation[_]](aggregateCalls.size)
+val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+val isDistinctAggs = new Array[Boolean](aggregateCalls.size)
 
-// create aggregate function instances by function type and aggregate 
field data type.
 
 Review comment:
   Restore this and the following line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7599) Support aggregation functions in the define and measures clause of MatchRecognize

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702972#comment-16702972
 ] 

ASF GitHub Bot commented on FLINK-7599:
---

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237422016
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -1169,6 +1169,94 @@ object AggregateUtil {
 (propPos._1, propPos._2, propPos._3)
   }
 
+  private[flink] def transformToAggregateFunction(
+  aggregateCall: SqlAggFunction,
+  isDistinct: Boolean,
+  aggregateInputType: Seq[RelDataType],
+  needRetraction: Boolean,
+  tableConfig: TableConfig,
+  isStateBackedDataViews: Boolean = false,
 
 Review comment:
   Remove the default. Implementers should explicitly state what they want.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> -
>
> Key: FLINK-7599
> URL: https://issues.apache.org/jira/browse/FLINK-7599
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dian Fu
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702988#comment-16702988
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237426646
 
 

 ##
 File path: flink-formats/flink-sequencefile/pom.xml
 ##
 @@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-sequencefile
+   flink-sequencefile
+
+   jar
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   
+   org.apache.flink
+   flink-shaded-hadoop2
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-hadoop-fs
+   ${project.version}
+   provided
+   
+
+
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.11
+   ${project.version}
 
 Review comment:
   Replace the hardcoded scala version with `_${scala.binary.version}`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702989#comment-16702989
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237427090
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link BulkWriter} implementation that wraps a {@link 
SequenceFile.Writer}.
+ *
+ * @param  The type of key written.
+ * @param  The type of value written.
+ */
+@PublicEvolving
+public class SequenceFileWriter 
implements BulkWriter> {
+   private final SequenceFile.Writer writer;
+
+   public SequenceFileWriter(SequenceFile.Writer writer) {
+   this.writer = checkNotNull(writer, "sequenceFileWriter");
+   }
+
+   @Override
+   public void addElement(Tuple2 element) throws IOException {
+
 
 Review comment:
   Remove empty line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink

2018-11-29 Thread GitBox
kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237427090
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link BulkWriter} implementation that wraps a {@link 
SequenceFile.Writer}.
+ *
+ * @param  The type of key written.
+ * @param  The type of value written.
+ */
+@PublicEvolving
+public class SequenceFileWriter 
implements BulkWriter> {
+   private final SequenceFile.Writer writer;
+
+   public SequenceFileWriter(SequenceFile.Writer writer) {
+   this.writer = checkNotNull(writer, "sequenceFileWriter");
+   }
+
+   @Override
+   public void addElement(Tuple2 element) throws IOException {
+
 
 Review comment:
   Remove empty line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702987#comment-16702987
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237426910
 
 

 ##
 File path: flink-formats/flink-sequencefile/pom.xml
 ##
 @@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-sequencefile
+   flink-sequencefile
+
+   jar
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   
+   org.apache.flink
+   flink-shaded-hadoop2
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-hadoop-fs
+   ${project.version}
+   provided
+   
+
+
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.11
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.11
+   ${project.version}
 
 Review comment:
   Same here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702991#comment-16702991
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237426794
 
 

 ##
 File path: flink-formats/flink-sequencefile/pom.xml
 ##
 @@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
 
 Review comment:
   Update the version to `1.8-SNAPSHOT`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702992#comment-16702992
 ] 

ASF GitHub Bot commented on FLINK-10457:


kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237428128
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/FiniteTestSource.java
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
 
 Review comment:
   This class seems like a copy from the `flink-parquet` module. It may make 
sense to create a `utils` module to avoid this duplication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support SequenceFile for StreamingFileSink
> --
>
> Key: FLINK-10457
> URL: https://issues.apache.org/jira/browse/FLINK-10457
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Jihyun Cho
>Priority: Major
>  Labels: pull-request-available
>
> SequenceFile is major file format in Hadoop eco system.
> It is simple to manage file and easy to combine with other tools.
> So we are still needed SequenceFile format, even if the file format supports 
> Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink

2018-11-29 Thread GitBox
kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237426973
 
 

 ##
 File path: flink-formats/flink-sequencefile/pom.xml
 ##
 @@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-sequencefile
+   flink-sequencefile
+
+   jar
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   
+   org.apache.flink
+   flink-shaded-hadoop2
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-hadoop-fs
+   ${project.version}
+   provided
+   
+
+
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.11
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.11
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-hadoop-compatibility_2.11
+   ${project.version}
 
 Review comment:
   Same here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10597) Enable UDFs support in MATCH_RECOGNIZE

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702986#comment-16702986
 ] 

ASF GitHub Bot commented on FLINK-10597:


twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237428381
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
 // We do not assert the proctime in the result, cause it is currently
 
 Review comment:
   We just introduced a new rich class. I think it would also be a perfect time 
to think about this shortcoming.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --
>
> Key: FLINK-10597
> URL: https://issues.apache.org/jira/browse/FLINK-10597
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink

2018-11-29 Thread GitBox
kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237426910
 
 

 ##
 File path: flink-formats/flink-sequencefile/pom.xml
 ##
 @@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-sequencefile
+   flink-sequencefile
+
+   jar
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   
+   org.apache.flink
+   flink-shaded-hadoop2
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-hadoop-fs
+   ${project.version}
+   provided
+   
+
+
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.11
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.11
+   ${project.version}
 
 Review comment:
   Same here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10999) Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast to java.lang.Long

2018-11-29 Thread xuqianjin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702983#comment-16702983
 ] 

xuqianjin edited comment on FLINK-10999 at 11/29/18 10:25 AM:
--

hi [~fhueske] Thank you very much. I can indeed support rowtime and proctime in 
a table, but I convert and execute according to his code successively. Proctime 
is added to the first table, and rowtime is added to the second table after 
converting to stream, I find that the execution plan is one layer short of 
rowtime operation conversion.


was (Author: x1q1j1):
hi [~fhueske] Thank you very much. I can indeed support rowtime and proctime in 
a table, but I convert and execute according to his code successively. First 
proctime and then rowtime, I find that the execution plan is one layer short of 
rowtime operation conversion.

> Adding time multiple times causes Runtime : java.sql.Timestamp cannot be cast 
> to java.lang.Long
> ---
>
> Key: FLINK-10999
> URL: https://issues.apache.org/jira/browse/FLINK-10999
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.5, 1.6.2
>Reporter: ideal-hp
>Priority: Major
>
> Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
> java.lang.Long
>  at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:27)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:95)
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:46)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
>  at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>  at 
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>  at DataStreamCalcRule$15.processElement(Unknown Source)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:66)
>  at 
> org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35)
>  at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>  at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>  at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u commented on a change in pull request #6774: [FLINK-10457] Support SequenceFile for StreamingFileSink

2018-11-29 Thread GitBox
kl0u commented on a change in pull request #6774: [FLINK-10457] Support 
SequenceFile for StreamingFileSink
URL: https://github.com/apache/flink/pull/6774#discussion_r237428128
 
 

 ##
 File path: 
flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/FiniteTestSource.java
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
 
 Review comment:
   This class seems like a copy from the `flink-parquet` module. It may make 
sense to create a `utils` module to avoid this duplication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >