[GitHub] fhueske commented on issue #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-09 Thread GitBox
fhueske commented on issue #6521: [FLINK-5315][table] Adding support for 
distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#issuecomment-411683146
 
 
   Thanks for the PR @walterddr!
   I did not have a detailed look, but we would also need to add support for 
the Java Table API which parses Strings to Expressions. 
   
   Cheers, Fabian


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys opened a new pull request #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test

2018-08-09 Thread GitBox
dawidwys opened a new pull request #6529: [FLINK-10106][e2e] Included test name 
in temp directory of e2e test
URL: https://github.com/apache/flink/pull/6529
 
 
   Simple improvement for e2e tests debuggability.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test

2018-08-09 Thread GitBox
zentol commented on a change in pull request #6529: [FLINK-10106][e2e] Included 
test name in temp directory of e2e test
URL: https://github.com/apache/flink/pull/6529#discussion_r208858966
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-runner-common.sh
 ##
 @@ -29,12 +29,14 @@ function run_test {
 description="$1"
 command="$2"
 
+name=$(basename "${command}" | sed -e 's/[_ ]/-/g' | sed -n 's/\.sh//p')
 
 Review comment:
   why not base it on the description instead? This should be more informative 
as the script is shared for multiple tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tragicjun commented on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs

2018-08-09 Thread GitBox
tragicjun commented on issue #6508: [Flink-10079] [table] Automatically 
register sink table from external catalogs 
URL: https://github.com/apache/flink/pull/6508#issuecomment-411685212
 
 
   @twalthr @suez1224 could you please review?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on issue #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test

2018-08-09 Thread GitBox
dawidwys commented on issue #6529: [FLINK-10106][e2e] Included test name in 
temp directory of e2e test
URL: https://github.com/apache/flink/pull/6529#issuecomment-411696699
 
 
   It does not work well when one of the parameters is url/uri. Fixing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208830206
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
 ##
 @@ -46,6 +46,8 @@ class FlinkRelBuilder(
 relOptCluster,
 relOptSchema) {
 
+  def getRelOptSchema: RelOptSchema = relOptSchema
 
 Review comment:
   Yes, check for `getRelOptSchema` usage in this commit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208831842
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalVersionedJoin.scala
 ##
 @@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util.Collections
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core._
+import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
+import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
+import org.apache.flink.util.Preconditions.checkArgument
+
+/**
+  * Represents a join between a table and 
[[org.apache.flink.table.functions.TableVersionFunction]]
+  *
+  * @param cluster
+  * @param traitSet
+  * @param left stream
+  * @param right table scan of underlying 
[[org.apache.flink.table.functions.TableVersionFunction]]
+  * @param condition must contain 
[[LogicalVersionedJoin#VERSIONING_JOIN_CONDITION()]] with
+  *  correctly defined references to rightVersioneExpression,
+  *  rightPrimaryKeyExpression and leftVersionExpression. We 
can not implement
+  *  those references as separate fields, because of problems 
with Calcite's
+  *  optimization rules like projections push downs, column
+  *  pruning/renaming/reordering, etc. Later 
rightVersioneExpression,
+  *  rightPrimaryKeyExpression and leftVersionExpression will 
be extracted from
+  *  the condition.
+  */
+class LogicalVersionedJoin private (
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+left: RelNode,
+right: RelNode,
+condition: RexNode)
+  extends Join(
+cluster,
+traitSet,
+left,
+right,
+condition,
+Collections.emptySet().asInstanceOf[java.util.Set[CorrelationId]],
+JoinRelType.INNER) {
+
+  override def copy(
+   traitSet: RelTraitSet,
+   condition: RexNode,
+   left: RelNode,
+   right: RelNode,
+   joinType: JoinRelType,
+   semiJoinDone: Boolean): LogicalVersionedJoin = {
+checkArgument(joinType == this.getJoinType,
+  "Can not change join type".asInstanceOf[Object])
+checkArgument(semiJoinDone == this.isSemiJoinDone,
+  "Can not change semiJoinDone".asInstanceOf[Object])
+new LogicalVersionedJoin(
+  cluster,
+  traitSet,
+  left,
+  right,
+  condition)
+  }
+}
+
+object LogicalVersionedJoin {
+  /**
+* See [[LogicalVersionedJoin#condition]]
+*/
+  val VERSIONING_JOIN_CONDITION = new SqlFunction(
 
 Review comment:
   We want and need to use Calcite's optimisations for joins (like pushing down 
predicate, column pruning, predicate pushdown etc). To enable that, 
`LogicalVersionedJoin` extends calcite's `Join`. However there is no way to 
expand `Join`'s semantic, that beside actual join condition we need some extra 
fields to handle versioning. The only way to do this, is to expose 
`leftVersionExpression`, `rightVersionExpression` and 
`rightPrimaryKeyExpression` via this `VERSIONING_JOIN_CONDITION`. Otherwise, if 
we kept those expressions on some hidden state, they could be 
pruned/renamed/reordered/... .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208833159
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala
 ##
 @@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.logical.LogicalCorrelate
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlCastFunction
+import org.apache.flink.table.api.{Table, ValidationException}
+import org.apache.flink.table.functions.TableVersionFunction
+import org.apache.flink.table.functions.sql.ProctimeSqlFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin
+import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.Preconditions.checkState
+
+class LogicalCorrelateToVersionedJoinRule
+  extends RelOptRule(
+operand(classOf[LogicalCorrelate],
+  some(
+operand(classOf[RelNode], any()),
+operand(classOf[TableFunctionScan], none(,
+"LogicalCorrelateToVersionedJoinRule") {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val logicalCorrelate: LogicalCorrelate = call.rel(0)
+val leftNode: RelNode = call.rel(1)
+val rightTableFunctionScan: TableFunctionScan = call.rel(2)
+
+val cluster = logicalCorrelate.getCluster
+
+new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode)
+  .visit(rightTableFunctionScan.getCall) match {
+  case Some(rightVersionedTableCall) =>
+val underlyingTableHistory: Table = 
rightVersionedTableCall.tableVersionFunction.table
+val relBuilder = this.relBuilderFactory.create(
+  cluster,
+  underlyingTableHistory.relBuilder.getRelOptSchema)
+val rexBuilder = cluster.getRexBuilder
+
+val rightNode: RelNode = 
underlyingTableHistory.logicalPlan.toRelNode(relBuilder)
+
+val rightVersionExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.versionField)
+
+val rightPrimaryKeyExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.primaryKey)
+
+relBuilder.push(
+  if (rightVersionedTableCall.tableVersionFunction.isProctime) {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightPrimaryKeyExpression)
+  }
+  else {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightVersionExpression,
+  rightPrimaryKeyExpression)
+  })
+call.transformTo(relBuilder.build())
+  case None =>
+}
+  }
+
+  private def createRightExpression(
+  rexBuilder: RexBuilder,
+  leftNode: RelNode,
+  rightNode: RelNode,
+  field: String): RexNode = {
+val rightReferencesOffset = leftNode.getRowType.getFieldCount
+val rightDataTypeField = rightNode.getRowType.getField(field, false, false)
+rexBuilder.makeInputRef(
+  rightDataTypeField.getType, rightReferencesOffset + 
rightDataTypeField.getIndex)
+  }
+}
+

[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208833101
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala
 ##
 @@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.logical.LogicalCorrelate
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlCastFunction
+import org.apache.flink.table.api.{Table, ValidationException}
+import org.apache.flink.table.functions.TableVersionFunction
+import org.apache.flink.table.functions.sql.ProctimeSqlFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin
+import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.Preconditions.checkState
+
+class LogicalCorrelateToVersionedJoinRule
+  extends RelOptRule(
+operand(classOf[LogicalCorrelate],
+  some(
+operand(classOf[RelNode], any()),
+operand(classOf[TableFunctionScan], none(,
+"LogicalCorrelateToVersionedJoinRule") {
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val logicalCorrelate: LogicalCorrelate = call.rel(0)
+val leftNode: RelNode = call.rel(1)
+val rightTableFunctionScan: TableFunctionScan = call.rel(2)
+
+val cluster = logicalCorrelate.getCluster
+
+new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode)
+  .visit(rightTableFunctionScan.getCall) match {
+  case Some(rightVersionedTableCall) =>
+val underlyingTableHistory: Table = 
rightVersionedTableCall.tableVersionFunction.table
+val relBuilder = this.relBuilderFactory.create(
+  cluster,
+  underlyingTableHistory.relBuilder.getRelOptSchema)
+val rexBuilder = cluster.getRexBuilder
+
+val rightNode: RelNode = 
underlyingTableHistory.logicalPlan.toRelNode(relBuilder)
+
+val rightVersionExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.versionField)
+
+val rightPrimaryKeyExpression = createRightExpression(
+  rexBuilder,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.tableVersionFunction.primaryKey)
+
+relBuilder.push(
+  if (rightVersionedTableCall.tableVersionFunction.isProctime) {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightPrimaryKeyExpression)
+  }
+  else {
+LogicalVersionedJoin.create(
+  rexBuilder,
+  cluster,
+  logicalCorrelate.getTraitSet,
+  leftNode,
+  rightNode,
+  rightVersionedTableCall.versionExpression,
+  rightVersionExpression,
+  rightPrimaryKeyExpression)
+  })
+call.transformTo(relBuilder.build())
+  case None =>
+}
+  }
+
+  private def createRightExpression(
+  rexBuilder: RexBuilder,
+  leftNode: RelNode,
+  rightNode: RelNode,
+  field: String): RexNode = {
+val rightReferencesOffset = leftNode.getRowType.getFieldCount
+val rightDataTypeField = rightNode.getRowType.getField(field, false, false)
+rexBuilder.makeInputRef(
+  rightDataTypeField.getType, rightReferencesOffset + 
rightDataTypeField.getIndex)
+  }
+}
+

[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208835846
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
 ##
 @@ -167,28 +167,28 @@ class JoinHarnessTest extends HarnessTestBase {
 
 testHarness.setProcessingTime(1)
 testHarness.processElement1(new StreamRecord(
-  CRow(Row.of(1L: JLong, "1a1"), change = true), 1))
+  CRow(1L: JLong, "1a1"), 1))
 
 Review comment:
   Again, I'm depending on this change in the following commit. What harm does 
it do here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208835295
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ##
 @@ -103,27 +97,42 @@ class DataStreamJoin(
   tableEnv: StreamTableEnvironment,
   queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
-val config = tableEnv.getConfig
-val returnType = schema.typeInfo
-val keyPairs = joinInfo.pairs().toList
+validateKeyTypes()
 
-// get the equality keys
-val leftKeys = ArrayBuffer.empty[Int]
-val rightKeys = ArrayBuffer.empty[Int]
+val leftDataStream =
 
 Review comment:
   I would be against that. I'm depending on this change in this PR, so it 
defines strict order of those two commits. Either I wouldn't publish the actual 
versioned joins PR after this commit is merged, or the versioned joins PR would 
look exactly the same as it looks now. Splitting into two PRs adds a lot of 
overhead and room for a lot of problems with PRs going out of sync. During 
reviewing it's also annoying, because you see the same code twice and reviewers 
(same or different ones) are often (almost always?) making half of the comments 
in one PR and half in the other.
   
   Besides, this is already in a separate commit so it can be reviewed 
separately. Maybe instead of reviewing PR all at once try reviewing it commit 
by commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-09 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r208829502
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -155,6 +158,90 @@ class Table(
 select(withResolvedAggFunctionCall: _*)
   }
 
+  /**
+* Creates table version function backed up by this table as a history 
table.
+*
+* @param version field for the [[TableVersionFunction]]. Must points to a 
time indicator
+* @param primaryKey field for the [[TableVersionFunction]].
+* @return [[TableVersionFunction]]
+*/
+  def createTableVersionFunction(
+  version: String,
+  primaryKey: String): TableVersionFunction = {
 
 Review comment:
   In first version me and @fhueske  didn't want to focus on this and we agreed 
to do this as a follow up task.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr opened a new pull request #6528: [FLINK-10107] [e2e] Exclude conflicting SQL JARs from test

2018-08-09 Thread GitBox
twalthr opened a new pull request #6528: [FLINK-10107] [e2e] Exclude 
conflicting SQL JARs from test
URL: https://github.com/apache/flink/pull/6528
 
 
   ## What is the purpose of the change
   
   This is a temporary solution for fixing the SQL Client end-to-end test for 
releases.
   
   
   ## Brief change log
   
   - Do not include conflicting SQL JARs in library folder of test.
   
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test

2018-08-09 Thread GitBox
dawidwys commented on a change in pull request #6529: [FLINK-10106][e2e] 
Included test name in temp directory of e2e test
URL: https://github.com/apache/flink/pull/6529#discussion_r208860682
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-runner-common.sh
 ##
 @@ -29,12 +29,14 @@ function run_test {
 description="$1"
 command="$2"
 
+name=$(basename "${command}" | sed -e 's/[_ ]/-/g' | sed -n 's/\.sh//p')
 
 Review comment:
   I chose the script name cause the descriptions right now are rather long, 
usually with common prefix, plus some of them are still same for different test 
runs e.g. : `Local recovery and sticky scheduling end-to-end test`.
   
   The command contains script parameters as well. So it should be even easier 
to differentiate runs this way rather than using description.
   
   Example dir name: `temp-test-resume-savepoint-2-2-file-true-34211378012`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann opened a new pull request #6531: [FLINK-10110][tests] Harden Kafka component shut down for E2E tests

2018-08-09 Thread GitBox
tillrohrmann opened a new pull request #6531: [FLINK-10110][tests] Harden Kafka 
component shut down for E2E tests
URL: https://github.com/apache/flink/pull/6531
 
 
   ## What is the purpose of the change
   
   Instead of only calling kafka-server-stop.sh and zookeeper-server-stop.sh 
which can fail
   due to KAFKA-4931 we also use jps to kill the Kafka and ZooKeeper processes.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] rickhofstede commented on issue #6503: [FLINK-10072] [docs] Syntax and consistency issues in "The Broadcast State Pattern"

2018-08-09 Thread GitBox
rickhofstede commented on issue #6503: [FLINK-10072] [docs] Syntax and 
consistency issues in "The Broadcast State Pattern"
URL: https://github.com/apache/flink/pull/6503#issuecomment-411703560
 
 
   @yanghua 
   
   Since you've reviewed my commits, let me please ask you the following. It 
appears that Travis doesn't like the last two commits in this pull request. 
This seems odd to me, since they introduce no functional changes to the code at 
all. Can you comment on what is going wrong there, such that I can fix it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske opened a new pull request #6527: [hotfix] [docs] Fix ProcessWindowFunction code snippets.

2018-08-09 Thread GitBox
fhueske opened a new pull request #6527: [hotfix] [docs] Fix 
ProcessWindowFunction code snippets.
URL: https://github.com/apache/flink/pull/6527
 
 
   ## What is the purpose of the change
   
   - Fix the code snippets for the `ProcessWindowFunction` in the documentation
   
   ## Brief change log
   
   Problems with the examples:
   
   - Java: Tuple does not take generic parameters (`Tuple`)
   - Java: `process()` method must be `public`
   - `MyProcessWindowFunction` expects `String` key, but no key extractor is 
used
   - `MyProcessWindowFunction` expects `TimeWindow`, but window assigner is not 
defined
   
   ## Verifying this change
   
   copy and paste code in an IDE.
   
   ## Does this pull request potentially affect one of the following parts:
   
   No
   
   ## Documentation
   
   Fix for the docs


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test

2018-08-09 Thread GitBox
dawidwys commented on a change in pull request #6529: [FLINK-10106][e2e] 
Included test name in temp directory of e2e test
URL: https://github.com/apache/flink/pull/6529#discussion_r208860682
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-runner-common.sh
 ##
 @@ -29,12 +29,14 @@ function run_test {
 description="$1"
 command="$2"
 
+name=$(basename "${command}" | sed -e 's/[_ ]/-/g' | sed -n 's/\.sh//p')
 
 Review comment:
   I chose the script name cause the descriptions right now are rather long, 
usually with long common prefix, plus some of them are still same for different 
test runs e.g. : `Local recovery and sticky scheduling end-to-end test`.
   
   The command contains script parameters as well. So it should be even easier 
to differentiate runs this way rather than using description.
   
   Example dir name: `temp-test-resume-savepoint-2-2-file-true-34211378012`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann opened a new pull request #6530: [FLINK-10110][tests] Harden Kafka component shut down for E2E tests

2018-08-09 Thread GitBox
tillrohrmann opened a new pull request #6530: [FLINK-10110][tests] Harden Kafka 
component shut down for E2E tests
URL: https://github.com/apache/flink/pull/6530
 
 
   ## What is the purpose of the change
   
   Instead of only calling kafka-server-stop.sh and zookeeper-server-stop.sh 
which can fail
   due to KAFKA-4931 we also use jps to kill the Kafka and ZooKeeper processes.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tweise commented on issue #6482: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards.

2018-08-08 Thread GitBox
tweise commented on issue #6482: [FLINK-10020] [kinesis] Support recoverable 
exceptions in listShards.
URL: https://github.com/apache/flink/pull/6482#issuecomment-411622174
 
 
   @tzulitai PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos.

2018-08-09 Thread GitBox
yanghua commented on a change in pull request #6522: [FLINK-10101][mesos] Add 
web ui url for mesos.
URL: https://github.com/apache/flink/pull/6522#discussion_r208818211
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -120,6 +120,9 @@
/** A local actor system for using the helper actors. */
private final ActorSystem actorSystem;
 
+   /** Web url for to show in mesos page.*/
+   @Nullable private final String webUiUrl;
 
 Review comment:
   for field, I recommend `@Nullable` to move up one line


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] maqingxiang commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8

2018-08-08 Thread GitBox
maqingxiang commented on a change in pull request #5304: [FLINK-8290]Modify 
clientId to groupId in flink-connector-kafka-0.8
URL: https://github.com/apache/flink/pull/5304#discussion_r208790526
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ##
 @@ -123,6 +124,7 @@ public SimpleConsumerThread(
this.fetchSize = getInt(config, "fetch.message.max.bytes", 
1048576);
this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 
65536);
this.reconnectLimit = getInt(config, 
"flink.simple-consumer-reconnectLimit", 3);
+   this.clientId = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
 
 Review comment:
   Hi @azagrebin,
   
   group.id and client.id are two separate concepts, it has different meanings. 
   
   I mean that there's no real meaning to the name of the client id as 
"flink-kafka-consumer-legacy-" + broker.id(). As different topics consumed by 
different consumers are all named with this value, it is impossible to 
distinguish and difficult to troubleshoot.
   
   I would suggest to open the client id to the user, and its default value set 
"group.id value".
   
   thanks.
   
   Cheers


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on issue #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit

2018-08-08 Thread GitBox
TisonKun commented on issue #6490: [FLINK-10056] [test] Add 
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490#issuecomment-411627844
 
 
   cc @aljoscha @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on issue #6345: [FLINK-9869][runtime] Send PartitionInfo in batch to Improve perfornance

2018-08-08 Thread GitBox
TisonKun commented on issue #6345: [FLINK-9869][runtime] Send PartitionInfo in 
batch to Improve perfornance
URL: https://github.com/apache/flink/pull/6345#issuecomment-411627672
 
 
   cc @GJL @twalthr 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua edited a comment on issue #6503: [FLINK-10072] [docs] Syntax and consistency issues in "The Broadcast State Pattern"

2018-08-09 Thread GitBox
yanghua edited a comment on issue #6503: [FLINK-10072] [docs] Syntax and 
consistency issues in "The Broadcast State Pattern"
URL: https://github.com/apache/flink/pull/6503#issuecomment-411724977
 
 
   hi @rickhofstede , Travis build error is not caused by your code. So, take 
it easy.  The next time you encounter a problem, you can follow the steps below 
to deal with it: First, click on the travis failure link to view the failure 
log. Then, analyze what is the cause of the failure and whether it is caused by 
you. If it is, then fix it, if not you can go to JIRA to report the issue. I 
have reported an issue for this exception, see FLINK-10111.
   
   You can also push an empty commit to make travis rebuild.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call

2018-08-09 Thread GitBox
azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] 
Add Async Example with External Rest API call
URL: https://github.com/apache/flink/pull/5996#discussion_r208905291
 
 

 ##
 File path: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncAPIExample.scala
 ##
 @@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.async
+
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.runtime.concurrent.Executors
+import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, 
StreamExecutionEnvironment, _}
+import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.impl.client.HttpClientBuilder
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object AsyncAPIExample {
+
+  def main(args: Array[String]) {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+val input : DataStream[Int] = env.addSource(new SimpleSource())
+
+val quoteStream: DataStream[(Int, String)] = AsyncDataStream.unorderedWait(
+  input,
+  new AsyncQuoteRequest,
+  1000,
+  TimeUnit.MILLISECONDS,
+  10)
+
+quoteStream.print()
+
+env.execute("Async API job")
+  }
+}
+
+class AsyncQuoteRequest extends AsyncFunction[Int, (Int, String)] {
+
+  /** The API specific client that can issue concurrent requests with 
callbacks */
+
+  /** The context used for the future callbacks */
+  implicit lazy val executor: ExecutionContext = ExecutionContext.global
+
+  lazy val client = new Quote()
+
+  override def asyncInvoke(input: Int, resultFuture: ResultFuture[(Int, 
String)]): Unit = {
+
+
+// issue the asynchronous request, receive a future for the result
+val resultFutureRequested: Future[String] = Future {
+  client.getQuote(input.toString)
+}
+
+// set the callback to be executed once the request by the client is 
complete
+// the callback simply forwards the result to the result future
+resultFutureRequested.onSuccess {
+  case result: String => {
+resultFuture.complete(Iterable((input, result)))
+  }
+}
+  }
+
+
+}
+
+class Quote {
+  @throws[Exception]
+  def getQuote(number: String) : String = {
+val url = "http://gturnquist-quoters.cfapps.io/api/; + number
+val client = HttpClientBuilder.create.build
+val request = new HttpGet(url)
+val response = client.execute(request)
+val rd = new BufferedReader(new 
InputStreamReader(response.getEntity.getContent))
+rd.readLine
 
 Review comment:
   I think it would be nice to close `client` before returning the result


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call

2018-08-09 Thread GitBox
azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] 
Add Async Example with External Rest API call
URL: https://github.com/apache/flink/pull/5996#discussion_r208906401
 
 

 ##
 File path: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncAPIExample.scala
 ##
 @@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.async
+
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.runtime.concurrent.Executors
+import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, 
StreamExecutionEnvironment, _}
+import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.impl.client.HttpClientBuilder
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object AsyncAPIExample {
+
+  def main(args: Array[String]) {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+val input : DataStream[Int] = env.addSource(new SimpleSource())
+
+val quoteStream: DataStream[(Int, String)] = AsyncDataStream.unorderedWait(
+  input,
+  new AsyncQuoteRequest,
+  1000,
+  TimeUnit.MILLISECONDS,
+  10)
+
+quoteStream.print()
+
+env.execute("Async API job")
+  }
+}
+
+class AsyncQuoteRequest extends AsyncFunction[Int, (Int, String)] {
+
+  /** The API specific client that can issue concurrent requests with 
callbacks */
+
+  /** The context used for the future callbacks */
+  implicit lazy val executor: ExecutionContext = ExecutionContext.global
+
+  lazy val client = new Quote()
+
+  override def asyncInvoke(input: Int, resultFuture: ResultFuture[(Int, 
String)]): Unit = {
+
+
+// issue the asynchronous request, receive a future for the result
+val resultFutureRequested: Future[String] = Future {
+  client.getQuote(input.toString)
+}
+
+// set the callback to be executed once the request by the client is 
complete
+// the callback simply forwards the result to the result future
+resultFutureRequested.onSuccess {
+  case result: String => {
+resultFuture.complete(Iterable((input, result)))
+  }
+}
+  }
+
+
+}
+
+class Quote {
 
 Review comment:
   `QuoteHttpClient`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call

2018-08-09 Thread GitBox
azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] 
Add Async Example with External Rest API call
URL: https://github.com/apache/flink/pull/5996#discussion_r208905065
 
 

 ##
 File path: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncAPIExample.scala
 ##
 @@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.async
+
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.runtime.concurrent.Executors
+import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, 
StreamExecutionEnvironment, _}
+import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.impl.client.HttpClientBuilder
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object AsyncAPIExample {
+
+  def main(args: Array[String]) {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+val input : DataStream[Int] = env.addSource(new SimpleSource())
+
+val quoteStream: DataStream[(Int, String)] = AsyncDataStream.unorderedWait(
+  input,
+  new AsyncQuoteRequest,
+  1000,
+  TimeUnit.MILLISECONDS,
+  10)
+
+quoteStream.print()
+
+env.execute("Async API job")
+  }
+}
+
+class AsyncQuoteRequest extends AsyncFunction[Int, (Int, String)] {
+
+  /** The API specific client that can issue concurrent requests with 
callbacks */
+
+  /** The context used for the future callbacks */
+  implicit lazy val executor: ExecutionContext = ExecutionContext.global
+
+  lazy val client = new Quote()
+
+  override def asyncInvoke(input: Int, resultFuture: ResultFuture[(Int, 
String)]): Unit = {
+
+
+// issue the asynchronous request, receive a future for the result
+val resultFutureRequested: Future[String] = Future {
+  client.getQuote(input.toString)
+}
 
 Review comment:
   It can be a bit shorter as in `AsyncIOExample`:
   ```
   Future {
 val result = client.getQuote(input.toString)
 resultFuture.complete(Iterable((input, result)))
   } (ExecutionContext.global)
   ```
   then no need for `implicit lazy val executor`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call

2018-08-09 Thread GitBox
azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] 
Add Async Example with External Rest API call
URL: https://github.com/apache/flink/pull/5996#discussion_r208904205
 
 

 ##
 File path: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncAPIExample.scala
 ##
 @@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.async
+
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.runtime.concurrent.Executors
+import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, 
StreamExecutionEnvironment, _}
+import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.impl.client.HttpClientBuilder
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object AsyncAPIExample {
 
 Review comment:
   I would suggest to name it `AsyncHttpCallExample` or `AsyncRestAPIExample`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6503: [FLINK-10072] [docs] Syntax and consistency issues in "The Broadcast State Pattern"

2018-08-09 Thread GitBox
yanghua commented on issue #6503: [FLINK-10072] [docs] Syntax and consistency 
issues in "The Broadcast State Pattern"
URL: https://github.com/apache/flink/pull/6503#issuecomment-411724977
 
 
   hi @rickhofstede , Travis build error is not caused by your code. So, take 
it easy.  The next time you encounter a problem, you can follow the steps below 
to deal with it: First, click on the travis failure link to view the failure 
log. Then, analyze what is the cause of the failure and whether it is caused by 
you. If it is, then fix it, if not you can go to JIRA to report the issue. I 
have reported an issue for this exception, see FLINK-10111.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on issue #6511: [FLINK-10085][tests] Update AbstractOperatorRestoreTestBase for 1.5

2018-08-09 Thread GitBox
dawidwys commented on issue #6511: [FLINK-10085][tests] Update 
AbstractOperatorRestoreTestBase for 1.5
URL: https://github.com/apache/flink/pull/6511#issuecomment-411749740
 
 
   Do we have any reasonable way to verify those kinds of PRs? Or shall we just 
look for typos while reviewing it? 
   
   If the latter than +1.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann closed pull request #5920: [FLINK-9231] [web] Enable SO_REUSEADDR on listen sockets for WebFront…

2018-08-09 Thread GitBox
tillrohrmann closed pull request #5920: [FLINK-9231] [web] Enable SO_REUSEADDR 
on listen sockets for WebFront…
URL: https://github.com/apache/flink/pull/5920
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
index 740beaee1cb..3d8e36b4f3f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java
@@ -29,6 +29,7 @@
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -103,7 +104,8 @@ protected void initChannel(SocketChannel ch) {
this.bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
-   .childHandler(initializer);
+   .childHandler(initializer)
+   .option(ChannelOption.SO_REUSEADDR, true);
 
ChannelFuture ch;
if (configuredAddress == null) {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #5920: [FLINK-9231] [web] Enable SO_REUSEADDR on listen sockets for WebFront…

2018-08-09 Thread GitBox
tillrohrmann commented on issue #5920: [FLINK-9231] [web] Enable SO_REUSEADDR 
on listen sockets for WebFront…
URL: https://github.com/apache/flink/pull/5920#issuecomment-411731732
 
 
   I will close this PR since it seems to be inactive. If you still want to 
work on it @trionesadam please reopen it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on issue #6379: [FLINK-9637] Add public user documentation for state TTL feature

2018-08-09 Thread GitBox
twalthr commented on issue #6379: [FLINK-9637] Add public user documentation 
for state TTL feature
URL: https://github.com/apache/flink/pull/6379#issuecomment-411756912
 
 
   Thank you @azagrebin. I will go through the text a last time and merge 
this...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #6528: [FLINK-10107] [e2e] Exclude conflicting SQL JARs from test

2018-08-09 Thread GitBox
asfgit closed pull request #6528: [FLINK-10107] [e2e] Exclude conflicting SQL 
JARs from test
URL: https://github.com/apache/flink/pull/6528
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml 
b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index ec5a0e10360..6e69568939f 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -153,13 +153,14 @@ under the License.

sql-jar

jar

+   


org.apache.flink

flink-connector-kafka-0.10_${scala.binary.version}
@@ -167,13 +168,14 @@ under the License.

sql-jar

jar

+   





 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] medcv commented on issue #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call

2018-08-09 Thread GitBox
medcv commented on issue #5996: [FLINK-9343] [Example] Add Async Example with 
External Rest API call
URL: https://github.com/apache/flink/pull/5996#issuecomment-411780022
 
 
   @azagrebin Thanks for the review. I will go through them and update the PR 
accordingly 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…

2018-08-09 Thread GitBox
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add 
documentation for RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504#discussion_r208952554
 
 

 ##
 File path: docs/ops/state/large_state_tuning.md
 ##
 @@ -142,6 +142,17 @@ by default. To enable this feature, users can instantiate 
a `RocksDBStateBackend
 new RocksDBStateBackend(filebackend, true);
 {% endhighlight %}
 
+**RocksDB Timers**
+
+For RocksDB, user can chose whether timers are stored on the heap (default) or 
inside RocksDB. Heap-based timers can have a better performance for smaller 
amounts of
+timers, while storing timers inside RocksDB offers higher scalability as the 
amount of timers in RocksDB can exceed the available main memory (spilling to 
disk).
+
+When using RockDB as state backend, the type of timer storage can be selected 
through Flink's configuration via option key 
`RocksDBOptions.TIMER_SERVICE_FACTORY`.
 
 Review comment:
   The option classes are mostly internal, right? So a proper YAML would be 
more useful I guess. Can I only set this property for an entire Flink cluster 
or also per-job?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…

2018-08-09 Thread GitBox
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add 
documentation for RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504#discussion_r208951393
 
 

 ##
 File path: docs/ops/state/large_state_tuning.md
 ##
 @@ -142,6 +142,17 @@ by default. To enable this feature, users can instantiate 
a `RocksDBStateBackend
 new RocksDBStateBackend(filebackend, true);
 {% endhighlight %}
 
+**RocksDB Timers**
+
+For RocksDB, user can chose whether timers are stored on the heap (default) or 
inside RocksDB. Heap-based timers can have a better performance for smaller 
amounts of
 
 Review comment:
   "a user"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…

2018-08-09 Thread GitBox
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add 
documentation for RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504#discussion_r208950482
 
 

 ##
 File path: docs/dev/stream/operators/process_function.md
 ##
 @@ -333,3 +332,43 @@ ctx.timerService.registerEventTimeTimer(coalescedTime)
 {% endhighlight %}
 
 
+
+Timers can also be stopped and removed as follows:
+
+Stopping a processing-time timer:
+
+
+
+{% highlight java %}
+long timestampOfTimerToStop = ...
+ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
 
 Review comment:
   Are there any performance implications of deleting a timer? I guess not 
anymore right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8

2018-08-09 Thread GitBox
azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify 
clientId to groupId in flink-connector-kafka-0.8
URL: https://github.com/apache/flink/pull/5304#discussion_r208965558
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ##
 @@ -123,6 +124,7 @@ public SimpleConsumerThread(
this.fetchSize = getInt(config, "fetch.message.max.bytes", 
1048576);
this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 
65536);
this.reconnectLimit = getInt(config, 
"flink.simple-consumer-reconnectLimit", 3);
+   this.clientId = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
 
 Review comment:
   @maqingxiang 
   Now I see your point, I think it makes sense to have `group.id` as a default 
value for `client.id` but keep them separately configurable, e.g.:
   ```
   String groupId = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
   this.clientId = config.getProperty("client.id", groupId);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api

2018-08-09 Thread GitBox
yanghua commented on issue #6266: [FLINK-9682] Add setDescription to execution 
environment and provide description field for the rest api
URL: https://github.com/apache/flink/pull/6266#issuecomment-411806613
 
 
   cc @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe…

2018-08-09 Thread GitBox
NicoK commented on a change in pull request #6294: [FLINK-9013][docs] Document 
yarn.containers.vcores only being effective whe…
URL: https://github.com/apache/flink/pull/6294#discussion_r208962025
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ##
 @@ -34,9 +36,22 @@ protected void formatLineBreak(StringBuilder state) {
}
 
@Override
-   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   protected void formatText(
+   StringBuilder state,
+   String format,
+   String[] elements,
+   EnumSet styles) {
String escapedFormat = escapeCharacters(format);
+
+   StringBuilder prefix = new StringBuilder();
+   StringBuilder sufix = new StringBuilder();
+   if (styles.contains(TextElement.TextStyle.CODE)) {
+   prefix.append("");
+   sufix.append("");
+   }
 
 Review comment:
   why use a `StringBuilder` for the `prefix`/`suffix` (also there is a typo in 
suffix)? They only ever contain a single string. This could rather be
   ```
String prefix = "";
String suffix = "";
if (styles.contains(TextElement.TextStyle.CODE)) {
prefix = "";
sufix = "";
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe…

2018-08-09 Thread GitBox
dawidwys commented on a change in pull request #6294: [FLINK-9013][docs] 
Document yarn.containers.vcores only being effective whe…
URL: https://github.com/apache/flink/pull/6294#discussion_r208969123
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ##
 @@ -34,9 +36,22 @@ protected void formatLineBreak(StringBuilder state) {
}
 
@Override
-   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   protected void formatText(
+   StringBuilder state,
+   String format,
+   String[] elements,
+   EnumSet styles) {
String escapedFormat = escapeCharacters(format);
+
+   StringBuilder prefix = new StringBuilder();
+   StringBuilder sufix = new StringBuilder();
+   if (styles.contains(TextElement.TextStyle.CODE)) {
+   prefix.append("");
+   sufix.append("");
+   }
 
 Review comment:
   That would make sense if we had more TextStyles, but as we don't have them 
I've changed it to your code snippet.
   
   Also fixed the typo.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe…

2018-08-09 Thread GitBox
GJL commented on a change in pull request #6294: [FLINK-9013][docs] Document 
yarn.containers.vcores only being effective whe…
URL: https://github.com/apache/flink/pull/6294#discussion_r208976465
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ##
 @@ -34,9 +36,22 @@ protected void formatLineBreak(StringBuilder state) {
}
 
@Override
-   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   protected void formatText(
+   StringBuilder state,
+   String format,
+   String[] elements,
+   EnumSet styles) {
String escapedFormat = escapeCharacters(format);
+
+   String prefix = "";
+   String suffix = "";
+   if (styles.contains(TextElement.TextStyle.CODE)) {
+   prefix = "";
 
 Review comment:
   `` is not part of the HTML5 standard: 
https://developer.mozilla.org/en-US/docs/Web/HTML/Element/tt


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature

2018-08-09 Thread GitBox
asfgit closed pull request #6379: [FLINK-9637] Add public user documentation 
for state TTL feature
URL: https://github.com/apache/flink/pull/6379
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index 44a3653a61f..72fca3e27f1 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -266,6 +266,132 @@ a `ValueState`. Once the count reaches 2 it will emit the 
average and clear the
 we start over from `0`. Note that this would keep a different state value for 
each different input
 key if we had tuples with different values in the first field.
 
+### State time-to-live (TTL)
+
+A time-to-live (TTL) can be assigned to the keyed state of any type. 
+In this case it will expire after the configured TTL
+and its stored value will be cleaned up on the best effort basis which is 
discussed in details later.
+
+The state collection types support per-entry TTLs: list elements and map 
entries expire independently.
+
+To use state TTL you must first build a `StateTtlConfig` object, 
+then TTL functionality can be enabled in any state descriptor passing this 
configuration:
+
+
+
+{% highlight java %}
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+
+StateTtlConfig ttlConfig = StateTtlConfig
+.newBuilder(Time.seconds(1))
+.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
+.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
+.build();
+
+ValueStateDescriptor stateDescriptor = new 
ValueStateDescriptor<>("text state", String.class);
+stateDescriptor.enableTimeToLive(ttlConfig);
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+import org.apache.flink.api.common.state.StateTtlConfig
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.state.ValueStateDescriptor
+
+val ttlConfig = StateTtlConfig
+.newBuilder(Time.seconds(1))
+.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
+.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
+.build
+
+val stateDescriptor = new ValueStateDescriptor[String]("text state", 
classOf[String])
+stateDescriptor.enableTimeToLive(ttlConfig)
+{% endhighlight %}
+
+
+
+The configuration has several options to consider. 
+The first parameter of `newBuilder` method is mandatory, it is the 
time-to-live value.
+
+The update type configures when the state TTL is refreshed (default 
`OnCreateAndWrite`):
+
+ - `StateTtlConfig.UpdateType.OnCreateAndWrite` - only on creation and write 
access,
+ - `StateTtlConfig.UpdateType.OnReadAndWrite` - also on read access.
+ 
+The state visibility configures whether the expired value is returned on read 
access 
+if it is not cleaned up yet (default `NeverReturnExpired`):
+
+ - `StateTtlConfig.StateVisibility.NeverReturnExpired` - expired value is 
never returned,
+ - `StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp` - returned if 
still available.
+ 
+ In case of `NeverReturnExpired`, the expired state behaves as if it does not 
exist anymore, 
+ even if it has yet to be removed. The option can be useful for the use cases 
+ where data has to become unavailable for read access strictly after TTL, 
+ e.g. application working with privacy sensitive data.
+ 
+Another option `ReturnExpiredIfNotCleanedUp` allows to return the expired 
state before its cleanup.
+
+**Notes:** 
+
+- The state backends store the timestamp of last modification along with the 
user value, 
+which means that enabling this feature increases consumption of state storage. 
+Heap state backend stores an additional Java object with a reference to the 
user state object 
+and a primitive long value in memory. The RocksDB state backend adds 8 bytes 
per stored value, list entry or map entry.
+
+- Only TTLs in reference to *processing time* are currently supported.
+
+- Trying to restore state, which was previously configured without TTL, using 
TTL enabled descriptor or vice versa
+will lead to compatibility failure and `StateMigrationException`.
+
+- The TTL configuration is not part of check- or savepoints 
+but rather a way how Flink treats it in the currently running job.
+
+ Cleanup of expired state
+
+Currently expired values are always removed when they are read out explicitly, 
+e.g. by calling `ValueState.value()`.
+
+Attention! This means that by default 
if expired state is not read, 
+it won't be removed, possibly leading to ever growing state. This might change 
in future releases. 
+
+Additionally you can activate the cleanup at the moment of taking the full 
state snapshot which 
+will reduce its 

[GitHub] dawidwys commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process

2018-08-09 Thread GitBox
dawidwys commented on issue #6205: [FLINK-9642]Reduce the count to deal with 
state during a CEP process
URL: https://github.com/apache/flink/pull/6205#issuecomment-411788642
 
 
   Hi @Aitozi 
   I found the dependencies between accessor and shared buffer a bit too much 
both sided. Please have a look at my idea how would I loosen it a bit in this 
branch: https://github.com/dawidwys/flink/tree/pr/6205. It does not compile, 
but you should get the idea how would I split up those two classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe…

2018-08-09 Thread GitBox
GJL commented on a change in pull request #6294: [FLINK-9013][docs] Document 
yarn.containers.vcores only being effective whe…
URL: https://github.com/apache/flink/pull/6294#discussion_r208986466
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ##
 @@ -34,9 +36,22 @@ protected void formatLineBreak(StringBuilder state) {
}
 
@Override
-   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   protected void formatText(
+   StringBuilder state,
+   String format,
+   String[] elements,
+   EnumSet styles) {
String escapedFormat = escapeCharacters(format);
+
+   String prefix = "";
+   String suffix = "";
+   if (styles.contains(TextElement.TextStyle.CODE)) {
+   prefix = "";
 
 Review comment:
   What I don't like is that `` or `` might not get formatted the 
same way as markdown's backticks. Why don't we enable markdown syntax in HTML 
tags. There is a configuration option in Kramdown, the default HTML converter 
in Jekyll:
   
   > By default, kramdown parses all block HTML tags and all XML tags as raw 
HTML blocks. However, this can be configured with the parse_block_html. If this 
is set to true, then syntax parsing in HTML blocks is globally enabled. It is 
also possible to enable/disable syntax parsing on a tag per tag basis using the 
markdown attribute:
   >
   >If an HTML tag has an attribute markdown="0", then the tag is parsed as 
raw HTML block.
   >
   >If an HTML tag has an attribute markdown="1", then the default 
mechanism for parsing syntax in this tag is used.
   >
   >If an HTML tag has an attribute markdown="block", then the content of 
the tag is parsed as block level elements.
   >
   >If an HTML tag has an attribute markdown="span", then the content of 
the tag is parsed as span level elements.
   
   https://kramdown.gettalong.org/syntax.html
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6532: [FLINK-10109] Add documentation for StreamingFileSink

2018-08-09 Thread GitBox
twalthr commented on a change in pull request #6532: [FLINK-10109] Add 
documentation for StreamingFileSink
URL: https://github.com/apache/flink/pull/6532#discussion_r208986029
 
 

 ##
 File path: docs/dev/connectors/streamfile_sink.md
 ##
 @@ -0,0 +1,95 @@
+---
+title: "Streaming File Sink"
+nav-title: Streaming File Sink
+nav-parent_id: connectors
+nav-pos: 5
+---
+
+
+This connector provides a Sink that writes partitioned files to filesystems
+supported by the Flink `FileSystem` abstraction. Since in streaming the input
+is potentially infinite, the streaming file sink writes data into buckets. The
+bucketing behaviour is configurable but a useful default is time-based
+bucketing where we start writing a new bucket every hour and thus get
+individual files that each contain a part of the infinite output stream.
+
+Within a bucket, we further split the output into smaller part files based on a
+rolling policy. This is useful to prevent individual bucket files from getting
+too big. This is also configurable but the default policy rolls files based on
+file size and a timeout, i.e if no new data was written to a part file. 
+
+ Usage
+
+The only required configuration are the base path were we want to output our
+data and an
+[Encoder](http://flink.apache.org/docs/latest/api/java/org/apache/flink/api/common/serialization/Encoder.html)
+that is used for serializing records to the `OutputStream` for each file.
+
+Basic usage thus looks like this:
+
+
+
+
+{% highlight java %}
+DataStream input = ...;
+
+final StreamingFileSink sink = StreamingFileSink
 
 Review comment:
   Please add imports to code examples. I try to do this recently to avoid user 
confusion. Where is `StreamingFileSink` located, which `Path` are we using, 
etc..


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6532: [FLINK-10109] Add documentation for StreamingFileSink

2018-08-09 Thread GitBox
twalthr commented on a change in pull request #6532: [FLINK-10109] Add 
documentation for StreamingFileSink
URL: https://github.com/apache/flink/pull/6532#discussion_r208985527
 
 

 ##
 File path: docs/dev/connectors/streamfile_sink.md
 ##
 @@ -0,0 +1,95 @@
+---
+title: "Streaming File Sink"
+nav-title: Streaming File Sink
+nav-parent_id: connectors
+nav-pos: 5
+---
+
+
+This connector provides a Sink that writes partitioned files to filesystems
+supported by the Flink `FileSystem` abstraction. Since in streaming the input
+is potentially infinite, the streaming file sink writes data into buckets. The
+bucketing behaviour is configurable but a useful default is time-based
+bucketing where we start writing a new bucket every hour and thus get
+individual files that each contain a part of the infinite output stream.
+
+Within a bucket, we further split the output into smaller part files based on a
+rolling policy. This is useful to prevent individual bucket files from getting
+too big. This is also configurable but the default policy rolls files based on
+file size and a timeout, i.e if no new data was written to a part file. 
+
+ Usage
+
+The only required configuration are the base path were we want to output our
+data and an
+[Encoder](http://flink.apache.org/docs/latest/api/java/org/apache/flink/api/common/serialization/Encoder.html)
 
 Review comment:
   We should link to version-specific class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…

2018-08-09 Thread GitBox
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add 
documentation for RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504#discussion_r208952984
 
 

 ##
 File path: docs/ops/state/large_state_tuning.md
 ##
 @@ -142,6 +142,17 @@ by default. To enable this feature, users can instantiate 
a `RocksDBStateBackend
 new RocksDBStateBackend(filebackend, true);
 {% endhighlight %}
 
+**RocksDB Timers**
+
+For RocksDB, user can chose whether timers are stored on the heap (default) or 
inside RocksDB. Heap-based timers can have a better performance for smaller 
amounts of
+timers, while storing timers inside RocksDB offers higher scalability as the 
amount of timers in RocksDB can exceed the available main memory (spilling to 
disk).
+
+When using RockDB as state backend, the type of timer storage can be selected 
through Flink's configuration via option key 
`RocksDBOptions.TIMER_SERVICE_FACTORY`.
+Possible choices are `HEAP` (to store timers on the heap, default) and 
`ROCKSDB` (to store timers in RocksDB).
 
 Review comment:
   Usually all YAML values are lower case. These are upper case?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…

2018-08-09 Thread GitBox
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add 
documentation for RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504#discussion_r208950851
 
 

 ##
 File path: docs/dev/stream/operators/process_function.md
 ##
 @@ -333,3 +332,43 @@ ctx.timerService.registerEventTimeTimer(coalescedTime)
 {% endhighlight %}
 
 
+
+Timers can also be stopped and removed as follows:
+
+Stopping a processing-time timer:
+
+
+
+{% highlight java %}
+long timestampOfTimerToStop = ...
+ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+val timestampOfTimerToStop = ...
+ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
+{% endhighlight %}
+
+
+
+Stopping an event-time timer:
+
+
+
+{% highlight java %}
+long timestampOfTimerToStop = ...
+ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+val timestampOfTimerToStop = ...
+ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
+{% endhighlight %}
+
+
+
+Note Stopping a timer has no effect if 
no such timer timer with the given timestamp is registered.
 
 Review comment:
   remove duplicate `timer`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…

2018-08-09 Thread GitBox
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add 
documentation for RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504#discussion_r208950079
 
 

 ##
 File path: docs/dev/stream/operators/process_function.md
 ##
 @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) 
are internally maintained
 
 The `TimerService` deduplicates timers per key and timestamp, i.e., there is 
at most one timer per key and timestamp. If multiple timers are registered for 
the same timestamp, the `onTimer()` method will be called just once.
 
-**Note:** Flink synchronizes invocations of `onTimer()` and 
`processElement()`. Hence, users do not have to worry about concurrent 
modification of state.
+Note Flink synchronizes invocations of 
`onTimer()` and `processElement()`. Hence, users do not have to worry about 
concurrent modification of state.
 
 ### Fault Tolerance
 
 Timers are fault tolerant and checkpointed along with the state of the 
application. 
 In case of a failure recovery or when starting an application from a 
savepoint, the timers are restored.
 
-**Note:** Checkpointed processing-time timers that were supposed to fire 
before their restoration, will fire immediately. 
+Note Checkpointed processing-time timers 
that were supposed to fire before their restoration, will fire immediately.
 This might happen when an application recovers from a failure or when it is 
started from a savepoint.
 
-**Note:** Timers are always synchronously checkpointed, regardless of the 
configuration of the state backends. 
-Therefore, a large number of timers can significantly increase checkpointing 
time. 
-See the "Timer Coalescing" section for advice on how to reduce the number of 
timers.
+Note Timers are always asynchronously 
checkpointed, except for the combination of RocksDB backend / with incremental 
snapshots / with heap-based timers (will be resolved with `FLINK-10026`).
 
 Review comment:
   It would be great if you could add more internal knowledge if there is more 
to say. This section is very tiny but I guess a lot of work went it to so we 
should document it accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…

2018-08-09 Thread GitBox
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add 
documentation for RocksDB-based timers and st…
URL: https://github.com/apache/flink/pull/6504#discussion_r208949265
 
 

 ##
 File path: docs/dev/stream/operators/process_function.md
 ##
 @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) 
are internally maintained
 
 The `TimerService` deduplicates timers per key and timestamp, i.e., there is 
at most one timer per key and timestamp. If multiple timers are registered for 
the same timestamp, the `onTimer()` method will be called just once.
 
 Review comment:
   Can we move this entire `Fault Tolerance` section to 
`dev/stream/state/checkpointing.md` for a new `Timers` section? This is a 
general concept that is not only relevant for `ProcessFunction`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aljoscha commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8

2018-08-09 Thread GitBox
aljoscha commented on a change in pull request #5304: [FLINK-8290]Modify 
clientId to groupId in flink-connector-kafka-0.8
URL: https://github.com/apache/flink/pull/5304#discussion_r208955188
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 ##
 @@ -123,6 +124,7 @@ public SimpleConsumerThread(
this.fetchSize = getInt(config, "fetch.message.max.bytes", 
1048576);
this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 
65536);
this.reconnectLimit = getInt(config, 
"flink.simple-consumer-reconnectLimit", 3);
+   this.clientId = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
 
 Review comment:
   Why is it not
   ```
   this.clientId = config.getProperty("client.id", 
"flink-kafka-consumer-legacy-" + broker.id());
   ```
   
   that way the client ID would be configurable separately from the group ID. 
As it is in the PR, you can never have a client ID that is different from the 
group ID if you set a `group.id`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL opened a new pull request #6533: [FLINK-9795][mesos, docs] Update Mesos documentation for FLIP-6

2018-08-09 Thread GitBox
GJL opened a new pull request #6533: [FLINK-9795][mesos, docs] Update Mesos 
documentation for FLIP-6
URL: https://github.com/apache/flink/pull/6533
 
 
   ## What is the purpose of the change
   
   *Update Mesos documentation to reflect changes that were introduced with 
FLIP-6.*
   
   cc: @tillrohrmann 
   
   ## Brief change log
   
 - *Remove Configuration parameters section.*
 - *Improve formatting.*
 - *Document which config options are only used in legacy mode.*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** (the feature 
is the documentation) / docs / JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-09 Thread GitBox
yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-411806291
 
 
   cc @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on issue #6499: [FLINK-10099] [test] Rework YarnResourceManagerTest

2018-08-09 Thread GitBox
tillrohrmann commented on issue #6499: [FLINK-10099] [test] Rework 
YarnResourceManagerTest
URL: https://github.com/apache/flink/pull/6499#issuecomment-411775562
 
 
   Thanks for addressing our comments @TisonKun. Merging this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xueyumusic commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-08-09 Thread GitBox
xueyumusic commented on a change in pull request #6282: 
[FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282#discussion_r208985576
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 ##
 @@ -340,3 +343,66 @@ case class DateFormat(timestamp: Expression, format: 
Expression) extends Express
 
   override private[flink] def resultType = STRING_TYPE_INFO
 }
+
+case class TimestampDiff(
+timeIntervalUnit: Expression,
+timestamp1: Expression,
+timestamp2: Expression)
+  extends Expression {
+
+  override private[flink] def children: Seq[Expression] =
+timeIntervalUnit :: timestamp1 :: timestamp2 :: Nil
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (!TypeCheckUtils.isTimePoint(timestamp1.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp1 is of type ${timestamp1.resultType}")
+}
+
+if (!TypeCheckUtils.isTimePoint(timestamp2.resultType)) {
+  return ValidationFailure(s"TimestampDiff operator requires Temporal 
input, " +
+s"but timestamp2 is of type ${timestamp2.resultType}")
+}
+
+timeIntervalUnit match {
+  case SymbolExpression(TimeIntervalUnit.YEAR)
+   | SymbolExpression(TimeIntervalUnit.MONTH)
+   | SymbolExpression(TimeIntervalUnit.DAY)
+   | SymbolExpression(TimeIntervalUnit.HOUR)
+   | SymbolExpression(TimeIntervalUnit.MINUTE)
+   | SymbolExpression(TimeIntervalUnit.SECOND)
+if timestamp1.resultType == SqlTimeTypeInfo.DATE
+  || timestamp1.resultType == SqlTimeTypeInfo.TIMESTAMP
+  || timestamp2.resultType == SqlTimeTypeInfo.DATE
+  || timestamp2.resultType == SqlTimeTypeInfo.TIMESTAMP =>
+ValidationSuccess
+
+  case _ =>
+ValidationFailure(s"TimestampDiff operator does not support unit 
'$timeIntervalUnit'" +
+s" for input of type ('${timestamp1.resultType}', 
'${timestamp2.resultType}').")
+}
+  }
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+val typeFactory = relBuilder
+  .asInstanceOf[FlinkRelBuilder]
+  .getTypeFactory
+
+val intervalUnit = timeIntervalUnit.asInstanceOf[SymbolExpression].symbol
+  .enum.asInstanceOf[TimeUnitRange]
+val intervalType = typeFactory.createSqlIntervalType(
+  new SqlIntervalQualifier(intervalUnit.startUnit, intervalUnit.endUnit, 
SqlParserPos.ZERO))
+
+val rexCall = relBuilder
+  .getRexBuilder
+  .makeCall(intervalType, SqlStdOperatorTable.MINUS_DATE,
 
 Review comment:
   Thanks @twalthr , it seems that if we call 
`SqlStdOperatorTable#TIMESTAMP_DIFF` here we need to add new codegen for 
timestampDiff sqlfunction..
   I will look deeply the other feedbacks recently, thank you :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client

2018-08-10 Thread GitBox
yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory 
instead of DefaultThreadFactory in RestServer/Client
URL: https://github.com/apache/flink/pull/6539#issuecomment-412244963
 
 
   Till, scala checkstyle error : 
   
   ```
   error 
file=/home/travis/build/apache/flink/flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala
 message=File line length exceeds 100 characters line=88
   ```
   
   other travis build task also failed, because of some connection timeout.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-10 Thread GitBox
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r209413346
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ##
 @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends 
UserDefinedFunction {
 * accumulator type should be automatically inferred.
 */
   def getAccumulatorType: TypeInformation[ACC] = null
+
+  private[flink] var isDistinctAgg: Boolean = false
+
+  private[flink] def distinct: AggregateFunction[T, ACC] = {
 
 Review comment:
   good catch!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process

2018-08-11 Thread GitBox
Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209419821
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -161,40 +100,30 @@ public void init(
eventsCount.putAll(maxIds);
}
 
-   /**
-* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
-* relation to the previous entry.
-*
-* @param stateName  name of the state that the event should be 
assigned to
-* @param eventIdunique id of event assigned by this 
SharedBuffer
-* @param previousNodeId id of previous entry (might be null if start 
of new run)
-* @param versionVersion of the previous relation
-* @return assigned id of this element
-* @throws Exception Thrown if the system cannot access the state.
-*/
-   public NodeId put(
-   final String stateName,
-   final EventId eventId,
-   @Nullable final NodeId previousNodeId,
-   final DeweyNumber version) throws Exception {
+   public SharedBufferAccessor getAccessor() {
+   return new SharedBufferAccessor<>(this);
+   }
 
-   if (previousNodeId != null) {
-   lockNode(previousNodeId);
+   public void advanceTime(long timestamp) throws Exception {
 
 Review comment:
   This need to be called in nfa package, I think it should be public.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process

2018-08-11 Thread GitBox
Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209419821
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -161,40 +100,30 @@ public void init(
eventsCount.putAll(maxIds);
}
 
-   /**
-* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
-* relation to the previous entry.
-*
-* @param stateName  name of the state that the event should be 
assigned to
-* @param eventIdunique id of event assigned by this 
SharedBuffer
-* @param previousNodeId id of previous entry (might be null if start 
of new run)
-* @param versionVersion of the previous relation
-* @return assigned id of this element
-* @throws Exception Thrown if the system cannot access the state.
-*/
-   public NodeId put(
-   final String stateName,
-   final EventId eventId,
-   @Nullable final NodeId previousNodeId,
-   final DeweyNumber version) throws Exception {
+   public SharedBufferAccessor getAccessor() {
+   return new SharedBufferAccessor<>(this);
+   }
 
-   if (previousNodeId != null) {
-   lockNode(previousNodeId);
+   public void advanceTime(long timestamp) throws Exception {
 
 Review comment:
   This need to be called in nfa package, I think it should be public.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] packet23 opened a new pull request #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.

2018-08-11 Thread GitBox
packet23 opened a new pull request #6540: [FLINK-9891] Added hook to shutdown 
cluster if a session was created in per-job mode.
URL: https://github.com/apache/flink/pull/6540
 
 
   ## What is the purpose of the change
   
   Change minimizes probability of a per-job yarn session cluster surviving 
after client exited. It restores the Flink 1.4 and below cli behaviour.
   
   
   ## Brief change log
   
 - when per-job yarn cluster is deployed, a shutdown hook installed
 - when shutdown hook is called, it terminates the deployed cluster
 - when cli terminates normally, shutdown hook is called by cli and 
deinstalled
 - otherwise, java runtime will call shutdown hook
   
   
   ## Verifying this change
   
   We verified the change manually by submitting a DataSet API application to 
Yarn and killing client before job terminated.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not documented
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on issue #6003: [FLINK-9289][Dataset] Parallelism of generated operators should have max parallelism of input

2018-08-06 Thread GitBox
fhueske commented on issue #6003: [FLINK-9289][Dataset] Parallelism of 
generated operators should have max parallelism of input
URL: https://github.com/apache/flink/pull/6003#issuecomment-410872065
 
 
   Thanks for the reminder @xccui! 
   I'll try to have a look this week.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StephanEwen opened a new pull request #6507: [FLINK-10069] [docs] Update SSL docs to reflect internal vs. external communication

2018-08-06 Thread GitBox
StephanEwen opened a new pull request #6507:  [FLINK-10069] [docs] Update SSL 
docs to reflect internal vs. external communication
URL: https://github.com/apache/flink/pull/6507
 
 
   This pull requests adds the documentation for the updated SSL setup. It 
explains internal and external connectivity, and discusses the configuration 
options.
   
   This also simplifies the described example setups a lot, which is now 
possible because we use dedicated certificates for internal connectivity. Those 
may be treated as a shared secret and thus need not match exact DNS names / IP 
addresses.
   
   The commits should be applied to the `1.6` and `master` branch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] suez1224 commented on issue #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17

2018-08-06 Thread GitBox
suez1224 commented on issue #6484: [FLINK-9134] [table] upgrade Calcite 
dependency to 1.17
URL: https://github.com/apache/flink/pull/6484#issuecomment-410878018
 
 
   merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 commented on issue #6430: [FLINK-8058][Queryable State]Queryable state should check types

2018-08-06 Thread GitBox
klion26 commented on issue #6430: [FLINK-8058][Queryable State]Queryable state 
should check types
URL: https://github.com/apache/flink/pull/6430#issuecomment-410893513
 
 
   hi, @kl0u @twalthr Could you please help reviewing this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider idle connection and multithreads synchronization

2018-08-03 Thread GitBox
walterddr commented on a change in pull request #6301: [FLINK-9794] [jdbc] 
JDBCOutputFormat does not consider idle connection and multithreads 
synchronization
URL: https://github.com/apache/flink/pull/6301#discussion_r207659233
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ##
 @@ -107,98 +130,11 @@ private void establishConnection() throws SQLException, 
ClassNotFoundException {
 */
@Override
public void writeRecord(Row row) throws IOException {
-
if (typesArray != null && typesArray.length > 0 && 
typesArray.length != row.getArity()) {
LOG.warn("Column SQL types array doesn't match arity of 
passed Row! Check the passed array...");
}
try {
-
-   if (typesArray == null) {
-   // no types provided
-   for (int index = 0; index < row.getArity(); 
index++) {
-   LOG.warn("Unknown column type for 
column {}. Best effort approach to set its value: {}.", index + 1, 
row.getField(index));
-   upload.setObject(index + 1, 
row.getField(index));
-   }
-   } else {
-   // types provided
-   for (int index = 0; index < row.getArity(); 
index++) {
-
-   if (row.getField(index) == null) {
-   upload.setNull(index + 1, 
typesArray[index]);
-   } else {
-   // casting values as suggested 
by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
-   switch (typesArray[index]) {
-   case 
java.sql.Types.NULL:
-   
upload.setNull(index + 1, typesArray[index]);
-   break;
-   case 
java.sql.Types.BOOLEAN:
-   case java.sql.Types.BIT:
-   
upload.setBoolean(index + 1, (boolean) row.getField(index));
-   break;
-   case 
java.sql.Types.CHAR:
-   case 
java.sql.Types.NCHAR:
-   case 
java.sql.Types.VARCHAR:
-   case 
java.sql.Types.LONGVARCHAR:
-   case 
java.sql.Types.LONGNVARCHAR:
-   
upload.setString(index + 1, (String) row.getField(index));
-   break;
-   case 
java.sql.Types.TINYINT:
-   
upload.setByte(index + 1, (byte) row.getField(index));
-   break;
-   case 
java.sql.Types.SMALLINT:
-   
upload.setShort(index + 1, (short) row.getField(index));
-   break;
-   case 
java.sql.Types.INTEGER:
-   
upload.setInt(index + 1, (int) row.getField(index));
-   break;
-   case 
java.sql.Types.BIGINT:
-   
upload.setLong(index + 1, (long) row.getField(index));
-   break;
-   case 
java.sql.Types.REAL:
-   
upload.setFloat(index + 1, (float) row.getField(index));
-   break;
-   case 
java.sql.Types.FLOAT:
-   case 
java.sql.Types.DOUBLE:
-   
upload.setDouble(index + 1, (double) row.getField(index));
-   break;
- 

[GitHub] walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL

2018-08-03 Thread GitBox
walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add 
regex_extract supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#discussion_r207694473
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string)
 
   
 {% highlight text %}
+REGEX_EXTRACT(str string, regex string, extractIndex integer)
+{% endhighlight %}
+  
+  
+Returns the string str extracted using specified regex pattern and 
index. If str or regex is null, returns null. E.g. 
REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns 
bar.
 
 Review comment:
   Also similar to SQL array, might be good to point out that the index starts 
with 1.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL

2018-08-03 Thread GitBox
walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add 
regex_extract supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#discussion_r207694127
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string)
 
   
 {% highlight text %}
+REGEX_EXTRACT(str string, regex string, extractIndex integer)
+{% endhighlight %}
+  
+  
+Returns the string str extracted using specified regex pattern and 
index. If str or regex is null, returns null. E.g. 
REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns 
bar.
 
 Review comment:
   This might be more informative:
   ```
   Return the string extracted from the `extractedIndex` capturing group using 
specified `regex` pattern on input string `str`.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on issue #6476: [FLINK-10025] Add getCheckpointConfig API for PythonStreamExecutionEnvironment

2018-08-04 Thread GitBox
zentol commented on issue #6476: [FLINK-10025] Add getCheckpointConfig API for 
PythonStreamExecutionEnvironment
URL: https://github.com/apache/flink/pull/6476#issuecomment-410428229
 
 
   This still needs a _python_ test to verify that all properties can be 
properly set from the python side.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol edited a comment on issue #6476: [FLINK-10025] Add getCheckpointConfig API for PythonStreamExecutionEnvironment

2018-08-04 Thread GitBox
zentol edited a comment on issue #6476: [FLINK-10025] Add getCheckpointConfig 
API for PythonStreamExecutionEnvironment
URL: https://github.com/apache/flink/pull/6476#issuecomment-410428229
 
 
   This still needs a _python_ test to verify that all properties can be 
properly set and retrieved from the python side.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6488: [FLINK-10055] incorrect in-progress file suffix in BucketingSink's java doc

2018-08-04 Thread GitBox
yanghua commented on issue #6488: [FLINK-10055] incorrect in-progress file 
suffix in BucketingSink's java doc
URL: https://github.com/apache/flink/pull/6488#issuecomment-410428669
 
 
   +1 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6486: [FLINK-10051][tests][sql] Add missing depenendeices for sql client E2E test

2018-08-04 Thread GitBox
yanghua commented on issue #6486: [FLINK-10051][tests][sql] Add missing 
depenendeices for sql client E2E test
URL: https://github.com/apache/flink/pull/6486#issuecomment-410428717
 
 
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] suez1224 commented on issue #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17

2018-08-04 Thread GitBox
suez1224 commented on issue #6484: [FLINK-9134] [table] upgrade Calcite 
dependency to 1.17
URL: https://github.com/apache/flink/pull/6484#issuecomment-410430674
 
 
   @twalthr addressed the comments. Let me know what you think.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL

2018-08-03 Thread GitBox
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add 
regex_extract supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#discussion_r207699091
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -450,6 +450,40 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "")
   }
 
+  @Test
+  def testRegexExtract(): Unit = {
 
 Review comment:
   Good point, here is a problem, I wrote this case to test  : 
   
   ```scala
   testAllApis(
 "foothebar".regexExtract("foo([\\w]+)", 1), //OK,
 "'foothebar'.regexExtract('foo([w]+)', 1)",  //failed, the 
method got 'foo([\\w]+)' returns "", but if pass 'foo([\\w]+)' would get 
compile error.
 "REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)",//OK, but must 
pass four '\'
 "thebar"
   )
   ```
   
   It seems flink pre-process the regex which contains `\xxx`. A few days ago, 
we also met this issue when test `similar to` to match the regex which contains 
`\d`.
   
   cc @twalthr 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL

2018-08-03 Thread GitBox
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add 
regex_extract supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#discussion_r207699117
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -450,6 +450,40 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "")
   }
 
+  @Test
+  def testRegexExtract(): Unit = {
 
 Review comment:
   This test case : 
   
   ```scala
   testSqlApi("REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)", "thebar")
   ```
   
   can pass


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL

2018-08-03 Thread GitBox
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add 
regex_extract supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#discussion_r207699091
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -450,6 +450,40 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "")
   }
 
+  @Test
+  def testRegexExtract(): Unit = {
 
 Review comment:
   Good point, here is a problem, I wrote this case to test  : 
   
   ```scala
   testAllApis(
 "foothebar".regexExtract("foo([\\w]+)", 1), //OK,
 "'foothebar'.regexExtract('foo([w]+)', 1)",  //failed, the 
method got 'foo([\\w]+)' returns "", but if pass 'foo([\\w]+)' would get 
compile error.
 "REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)",//OK, but must pass 
four '\'
 "thebar"
   )
   ```
   
   It seems flink pre-process the regex which contains `\xxx`. A few days ago, 
we also met this issue when test `similar to` to match the regex which contains 
`\d`.
   
   cc @twalthr 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bowenli86 opened a new pull request #6488: [FLINK-10055] incorrect in-progress file suffix in BucketingSink's java doc

2018-08-03 Thread GitBox
bowenli86 opened a new pull request #6488: [FLINK-10055] incorrect in-progress 
file suffix in BucketingSink's java doc
URL: https://github.com/apache/flink/pull/6488
 
 
   ## What is the purpose of the change
   
   
   ## Brief change log
   
   Similar to the java doc of `setPendingSuffix()` and 
`setValidLengthSuffix()`, the java doc of `setInProgressSuffix()` should have a 
`.` in `The default is {@code ".in-progress"}.`
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
   none 
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL

2018-08-03 Thread GitBox
walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add 
regex_extract supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#discussion_r207700798
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string)
 
   
 {% highlight text %}
+REGEX_EXTRACT(str string, regex string, extractIndex integer)
+{% endhighlight %}
+  
+  
+Returns the string str extracted using specified regex pattern and 
index. If str or regex is null, returns null. E.g. 
REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns 
bar.
 
 Review comment:
   Thanks for the explanation. Sorry I was a bit confused on the wording in the 
doc. Maybe similar to Hive, we can add this line to the doc: 
   ```
   See docs/api/java/util/regex/Matcher.html for more information
   ```
   This way it is immediately clear, what do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17

2018-08-04 Thread GitBox
suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] 
upgrade Calcite dependency to 1.17
URL: https://github.com/apache/flink/pull/6484#discussion_r207701607
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
 ##
 @@ -0,0 +1,5609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql2rel;
+
+import org.apache.calcite.avatica.util.Spaces;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptSamplingParameters;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.prepare.RelOptTableImpl;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Collect;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.core.Sample;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.Uncollect;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalMatch;
+import org.apache.calcite.rel.logical.LogicalMinus;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelColumnMapping;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.calcite.rel.stream.LogicalDelta;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCallBinding;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexFieldCollation;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexPatternFieldRef;
+import org.apache.calcite.rex.RexRangeRef;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSubQuery;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.schema.ColumnStrategy;
+import org.apache.calcite.schema.ModifiableTable;
+import org.apache.calcite.schema.ModifiableView;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.calcite.schema.Wrapper;
+import org.apache.calcite.sql.JoinConditionType;
+import org.apache.calcite.sql.JoinType;
+import org.apache.calcite.sql.SemiJoinType;
+import 

[GitHub] suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17

2018-08-04 Thread GitBox
suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] 
upgrade Calcite dependency to 1.17
URL: https://github.com/apache/flink/pull/6484#discussion_r207701618
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
 ##
 @@ -20,7 +20,10 @@ package org.apache.flink.table.catalog
 
 import java.util.{Collection => JCollection, Collections => JCollections, 
LinkedHashSet => JLinkedHashSet, Set => JSet}
 
+import com.google.common.collect.ImmutableSet
+import jdk.nashorn.internal.ir.annotations.Immutable
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17

2018-08-04 Thread GitBox
suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] 
upgrade Calcite dependency to 1.17
URL: https://github.com/apache/flink/pull/6484#discussion_r207701611
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala
 ##
 @@ -55,19 +58,12 @@ class WindowAggregateReduceFunctionsRule extends 
AggregateReduceFunctionsRule(
   newAgg))
   }
 
-  override def newCalcRel(
-  relBuilder: RelBuilder,
-  oldAgg: Aggregate,
-  exprs: util.List[RexNode]): Unit = {
-
-// add all named properties of the window to the selection
-val oldWindowAgg = oldAgg.asInstanceOf[LogicalWindowAggregate]
-oldWindowAgg.getNamedProperties.foreach(np => 
exprs.add(relBuilder.field(np.name)))
-
-// create a LogicalCalc that computes the complex aggregates and forwards 
the window properties
-relBuilder.project(exprs, oldAgg.getRowType.getFieldNames)
+  override def newCalcRel(relBuilder: RelBuilder, rowType: RelDataType, exprs: 
util.List[RexNode]) {
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL

2018-08-03 Thread GitBox
walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add 
regex_extract supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#discussion_r207699504
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string)
 
   
 {% highlight text %}
+REGEX_EXTRACT(str string, regex string, extractIndex integer)
+{% endhighlight %}
+  
+  
+Returns the string str extracted using specified regex pattern and 
index. If str or regex is null, returns null. E.g. 
REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns 
bar.
 
 Review comment:
   Good point. so another question is: does `REGEX_EXTRACT` returns an array of 
String similar to how Pattern/Matcher in java does it when extract all 
capturing groups? or is it concatenated? If so, what's the delimiter? (since in 
the code it seems only `String` type is returned.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL

2018-08-03 Thread GitBox
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add 
regex_extract supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#discussion_r207699629
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string)
 
   
 {% highlight text %}
+REGEX_EXTRACT(str string, regex string, extractIndex integer)
+{% endhighlight %}
+  
+  
+Returns the string str extracted using specified regex pattern and 
index. If str or regex is null, returns null. E.g. 
REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns 
bar.
 
 Review comment:
   it just returns the extractIndex-th value of match group array, just one 
element not an array, refers to : 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF (please 
search 'regex_extract')


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6476: [FLINK-10025] Add getCheckpointConfig API for PythonStreamExecutionEnvironment

2018-08-04 Thread GitBox
yanghua commented on issue #6476: [FLINK-10025] Add getCheckpointConfig API for 
PythonStreamExecutionEnvironment
URL: https://github.com/apache/flink/pull/6476#issuecomment-410428408
 
 
   OK, Can you review PR #6475 so that I can write a test with Java?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on issue #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17

2018-08-04 Thread GitBox
hequn8128 commented on issue #6484: [FLINK-9134] [table] upgrade Calcite 
dependency to 1.17
URL: https://github.com/apache/flink/pull/6484#issuecomment-410431477
 
 
   @suez1224 Hi, thanks for your PR. Only one question from my side: do we have 
any good ideas or plans about how to remove `AuxiliaryConverter` since 
calcite-1761 is fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua edited a comment on issue #6487: [FLINK-10053] Add getParallelism API for PythonStreamExecutionEnvironment

2018-08-03 Thread GitBox
yanghua edited a comment on issue #6487: [FLINK-10053] Add getParallelism API 
for PythonStreamExecutionEnvironment
URL: https://github.com/apache/flink/pull/6487#issuecomment-410422509
 
 
   @zentol the test will be added after the PR : #6475 been merged


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL

2018-08-03 Thread GitBox
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add 
regex_extract supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#discussion_r207699091
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -450,6 +450,40 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "")
   }
 
+  @Test
+  def testRegexExtract(): Unit = {
 
 Review comment:
   Good point, here is a problem, I wrote this case to test  : 
   
   ```scala
   testAllApis(
 "foothebar".regexExtract("foo([\\w]+)", 1), //OK,
 "'foothebar'.regexExtract('foo([w]+)', 1)",  //failed, got 
'foo([\\w]+)' returns "", but if pass 'foo([\\w]+)' would get compile error.
 "REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)",//OK, but must pass 
four '\'
 "thebar"
   )
   ```
   
   It seems flink pre-process the regex which contains `\xxx`. A few days ago, 
we also met this issue when test `similar to` to match the regex which contains 
`\d`.
   
   cc @twalthr 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua opened a new pull request #6487: [FLINK-10053] Add getParallelism API for PythonStreamExecutionEnvironment

2018-08-03 Thread GitBox
yanghua opened a new pull request #6487: [FLINK-10053] Add getParallelism API 
for PythonStreamExecutionEnvironment
URL: https://github.com/apache/flink/pull/6487
 
 
   ## What is the purpose of the change
   
   *This pull request adds getParallelism API for 
PythonStreamExecutionEnvironment*
   
   
   ## Brief change log
   
 - *Add getParallelism API for PythonStreamExecutionEnvironment*
   
   ## Verifying this change
   
   TBD
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6487: [FLINK-10053] Add getParallelism API for PythonStreamExecutionEnvironment

2018-08-03 Thread GitBox
yanghua commented on issue #6487: [FLINK-10053] Add getParallelism API for 
PythonStreamExecutionEnvironment
URL: https://github.com/apache/flink/pull/6487#issuecomment-410422509
 
 
   @zentol the test will be added after the PR : #6475 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Xpray commented on issue #6465: [FLINK-10008] [table] Improve the LOG function in Table to support bases less than 1

2018-08-03 Thread GitBox
Xpray commented on issue #6465: [FLINK-10008] [table] Improve the LOG function 
in Table to support bases less than 1
URL: https://github.com/apache/flink/pull/6465#issuecomment-410423217
 
 
   @xccui, I've updated the PR. 
   Best.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL

2018-08-04 Thread GitBox
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add 
regex_extract supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#discussion_r207701539
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string)
 
   
 {% highlight text %}
+REGEX_EXTRACT(str string, regex string, extractIndex integer)
+{% endhighlight %}
+  
+  
+Returns the string str extracted using specified regex pattern and 
index. If str or regex is null, returns null. E.g. 
REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns 
bar.
 
 Review comment:
   I agree this idea, but the path `docs/api/java/util/regex/Matcher.html`, how 
to access? It's not a complete URL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17

2018-08-04 Thread GitBox
suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] 
upgrade Calcite dependency to 1.17
URL: https://github.com/apache/flink/pull/6484#discussion_r207702177
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
 ##
 @@ -161,7 +161,7 @@ class GroupWindowTest extends TableTestBase {
 "rowtime('w$) AS w$rowtime",
 "proctime('w$) AS w$proctime")
 ),
-term("select", "EXPR$0", "DATETIME_PLUS(w$end, 6) AS EXPR$1")
+term("select", "EXPR$0", "+(w$end, 6) AS EXPR$1")
 
 Review comment:
   This is changed in https://issues.apache.org/jira/browse/CALCITE-2188. 
DATETIME_PLUS operator is not equal to PLUS operator, they just somehow share 
the same name "+" in plan after the PR. I can reach out to Calcite community to 
see if it makes sense to revert it back, I agree that using "DATETIME_PLUS" is 
more clear. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL

2018-08-03 Thread GitBox
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add 
regex_extract supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#discussion_r207698953
 
 

 ##
 File path: docs/dev/table/sql.md
 ##
 @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string)
 
   
 {% highlight text %}
+REGEX_EXTRACT(str string, regex string, extractIndex integer)
+{% endhighlight %}
+  
+  
+Returns the string str extracted using specified regex pattern and 
index. If str or regex is null, returns null. E.g. 
REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns 
bar.
 
 Review comment:
   for`REGEX_EXTRACT `,  it can pass 0, that means extract all.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL

2018-08-03 Thread GitBox
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add 
regex_extract supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#discussion_r207699091
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -450,6 +450,40 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "")
   }
 
+  @Test
+  def testRegexExtract(): Unit = {
 
 Review comment:
   Good point, here is a problem, I wrote this case to test  : 
   
   ```scala
   testAllApis(
 "foothebar".regexExtract("foo([\\w]+)", 1), //OK, the 
method got 'foo([\w]+)'
 "'foothebar'.regexExtract('foo([w]+)', 1)",  //failed, the 
method got 'foo([\\w]+)' returns "", but if pass 'foo([\\w]+)' would get 
compile error.
 "REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)",//OK, the method 
got 'foo([\w]+)' but must pass four '\'
 "thebar"
   )
   ```
   
   It seems flink pre-process the regex which contains `\xxx`. A few days ago, 
we also met this issue when test `similar to` to match the regex which contains 
`\d`.
   
   cc @twalthr 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Xpray commented on a change in pull request #6465: [FLINK-10008] [table] Improve the LOG function in Table to support bases less than 1

2018-08-03 Thread GitBox
Xpray commented on a change in pull request #6465: [FLINK-10008] [table] 
Improve the LOG function in Table to support bases less than 1
URL: https://github.com/apache/flink/pull/6465#discussion_r207699770
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ##
 @@ -103,11 +103,13 @@ object ScalarFunctions {
 if (x <= 0.0) {
   throw new IllegalArgumentException(s"x of 'log(base, x)' must be > 0, 
but x = $x")
 }
-if (base <= 1.0) {
-  throw new IllegalArgumentException(s"base of 'log(base, x)' must be > 1, 
but base = $base")
-} else {
-  Math.log(x) / Math.log(base)
+if (base.compareTo(1.0) == 0) {
+  throw new IllegalArgumentException(s"base of 'log(base, x)' can not be 
1")
+}
+if (base.compareTo(0.0) <= 0) {
 
 Review comment:
   hi @xccui , you're right, there's no need to call ``compareTo``


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on issue #6078: [FLINK-9438] Add documentation for AvroDeserializationSchema

2018-08-07 Thread GitBox
tzulitai commented on issue #6078: [FLINK-9438] Add documentation for 
AvroDeserializationSchema
URL: https://github.com/apache/flink/pull/6078#issuecomment-411288894
 
 
   +1, LGTM


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #6506: [FLINK-10073] [sql-client] Allow setting a restart strategy in SQL Client

2018-08-08 Thread GitBox
asfgit closed pull request #6506: [FLINK-10073] [sql-client] Allow setting a 
restart strategy in SQL Client
URL: https://github.com/apache/flink/pull/6506
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index d35aa591a4d..0e8d2d651b1 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -108,6 +108,8 @@ Greg, 1
 
 Both result modes can be useful during the prototyping of SQL queries.
 
+Attention Queries that are executed in 
a batch environment, can only be retrieved using the `table` result mode.
+
 After a query is defined, it can be submitted to the cluster as a 
long-running, detached Flink job. For this, a target system that stores the 
results needs to be specified using the [INSERT INTO 
statement](sqlClient.html#detached-sql-queries). The [configuration 
section](sqlClient.html#configuration) explains how to declare table sources 
for reading data, how to declare table sinks for writing data, and how to 
configure other table program properties.
 
 {% top %}
@@ -204,6 +206,8 @@ execution:
   max-parallelism: 16   # optional: Flink's maximum parallelism 
(128 by default)
   min-idle-state-retention: 0   # optional: table program's minimum idle 
state time
   max-idle-state-retention: 0   # optional: table program's maximum idle 
state time
+  restart-strategy: # optional: restart strategy
+type: fallback  #   "fallback" to global restart 
strategy by default
 
 # Deployment properties allow for describing the cluster to which table 
programs are submitted to.
 
@@ -227,7 +231,35 @@ Depending on the use case, a configuration can be split 
into multiple files. The
 CLI commands > session environment file > defaults environment file
 {% endhighlight %}
 
-Queries that are executed in a batch environment, can only be retrieved using 
the `table` result mode. 
+ Restart Strategies
+
+Restart strategies control how Flink jobs are restarted in case of a failure. 
Similar to [global restart strategies]({{ site.baseurl 
}}/dev/restart_strategies.html) for a Flink cluster, a more fine-grained 
restart configuration can be declared in an environment file.
+
+The following strategies are supported:
+
+{% highlight yaml %}
+execution:
+  # falls back to the global strategy defined in flink-conf.yaml
+  restart-strategy:
+type: fallback
+
+  # job fails directly and no restart is attempted
+  restart-strategy:
+type: none
+
+  # attempts a given number of times to restart the job
+  restart-strategy:
+type: fixed-delay
+attempts: 3  # retries before job is declared as failed (default: 
Integer.MAX_VALUE)
+delay: 1 # delay in ms between retries (default: 10 s)
+
+  # attempts as long as the maximum number of failures per time interval is 
not exceeded
+  restart-strategy:
+type: failure-rate
+max-failures-per-interval: 1   # retries in interval until failing 
(default: 1)
+failure-rate-interval: 6   # measuring interval in ms for failure rate
+delay: 1   # delay in ms between retries (default: 10 
s)
+{% endhighlight %}
 
 {% top %}
 
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml 
b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 51e6e95bc1f..8be4ce63ecb 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -74,6 +74,12 @@ execution:
   min-idle-state-retention: 0
   # maximum idle state retention in ms
   max-idle-state-retention: 0
+  # controls how table programs are restarted in case of a failures
+  restart-strategy:
+# strategy type
+# possible values are "fixed-delay", "failure-rate", "none", or "fallback" 
(default)
+type: fallback
+
 
 #==
 # Deployment properties
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
index 0d6e6dd59ac..b7c28938121 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.client.config;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 
 import java.util.Collections;
@@ -57,10 +59,10 @@ 

[GitHub] GJL commented on a change in pull request #6496: [FLINK-10063][tests] Use runit to supervise mesos processes.

2018-08-08 Thread GitBox
GJL commented on a change in pull request #6496: [FLINK-10063][tests] Use runit 
to supervise mesos processes.
URL: https://github.com/apache/flink/pull/6496#discussion_r208471471
 
 

 ##
 File path: flink-jepsen/src/jepsen/flink/mesos.clj
 ##
 @@ -24,11 +24,35 @@
 [jepsen.os.debian :as debian]
 [jepsen.flink.zookeeper :refer [zookeeper-uri]]))
 
+;;; runit process supervisor (http://smarden.org/runit/)
+;;;
+;;; We use runit to supervise Mesos processes because Mesos uses a "fail-fast" 
approach to
+;;; error handling, e.g., the Mesos master will exit when it discovers it has 
been partitioned away
+;;; from the Zookeeper quorum.
+
+(def runit-version "2.1.2-3")
+
+(defn create-supervised-service!
+  "Registers a service with the process supervisor and starts it."
+  [service-name cmd]
+  (let [service-dir (str "/etc/sv/" service-name)
+run-script (str service-dir "/run")]
+(c/su
+  (c/exec :mkdir :-p service-dir)
+  (c/exec :echo (clojure.string/join "\n" ["#!/bin/sh" cmd]) :> run-script)
 
 Review comment:
   I'll fix this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


<    4   5   6   7   8   9   10   11   12   13   >