[GitHub] fhueske commented on issue #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
fhueske commented on issue #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#issuecomment-411683146 Thanks for the PR @walterddr! I did not have a detailed look, but we would also need to add support for the Java Table API which parses Strings to Expressions. Cheers, Fabian This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys opened a new pull request #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test
dawidwys opened a new pull request #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test URL: https://github.com/apache/flink/pull/6529 Simple improvement for e2e tests debuggability. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test
zentol commented on a change in pull request #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test URL: https://github.com/apache/flink/pull/6529#discussion_r208858966 ## File path: flink-end-to-end-tests/test-scripts/test-runner-common.sh ## @@ -29,12 +29,14 @@ function run_test { description="$1" command="$2" +name=$(basename "${command}" | sed -e 's/[_ ]/-/g' | sed -n 's/\.sh//p') Review comment: why not base it on the description instead? This should be more informative as the script is shared for multiple tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tragicjun commented on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs
tragicjun commented on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs URL: https://github.com/apache/flink/pull/6508#issuecomment-411685212 @twalthr @suez1224 could you please review? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys commented on issue #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test
dawidwys commented on issue #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test URL: https://github.com/apache/flink/pull/6529#issuecomment-411696699 It does not work well when one of the parameters is url/uri. Fixing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208830206 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala ## @@ -46,6 +46,8 @@ class FlinkRelBuilder( relOptCluster, relOptSchema) { + def getRelOptSchema: RelOptSchema = relOptSchema Review comment: Yes, check for `getRelOptSchema` usage in this commit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208831842 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalVersionedJoin.scala ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util.Collections + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core._ +import org.apache.calcite.rex.{RexBuilder, RexNode} +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes} +import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind} +import org.apache.flink.util.Preconditions.checkArgument + +/** + * Represents a join between a table and [[org.apache.flink.table.functions.TableVersionFunction]] + * + * @param cluster + * @param traitSet + * @param left stream + * @param right table scan of underlying [[org.apache.flink.table.functions.TableVersionFunction]] + * @param condition must contain [[LogicalVersionedJoin#VERSIONING_JOIN_CONDITION()]] with + * correctly defined references to rightVersioneExpression, + * rightPrimaryKeyExpression and leftVersionExpression. We can not implement + * those references as separate fields, because of problems with Calcite's + * optimization rules like projections push downs, column + * pruning/renaming/reordering, etc. Later rightVersioneExpression, + * rightPrimaryKeyExpression and leftVersionExpression will be extracted from + * the condition. + */ +class LogicalVersionedJoin private ( +cluster: RelOptCluster, +traitSet: RelTraitSet, +left: RelNode, +right: RelNode, +condition: RexNode) + extends Join( +cluster, +traitSet, +left, +right, +condition, +Collections.emptySet().asInstanceOf[java.util.Set[CorrelationId]], +JoinRelType.INNER) { + + override def copy( + traitSet: RelTraitSet, + condition: RexNode, + left: RelNode, + right: RelNode, + joinType: JoinRelType, + semiJoinDone: Boolean): LogicalVersionedJoin = { +checkArgument(joinType == this.getJoinType, + "Can not change join type".asInstanceOf[Object]) +checkArgument(semiJoinDone == this.isSemiJoinDone, + "Can not change semiJoinDone".asInstanceOf[Object]) +new LogicalVersionedJoin( + cluster, + traitSet, + left, + right, + condition) + } +} + +object LogicalVersionedJoin { + /** +* See [[LogicalVersionedJoin#condition]] +*/ + val VERSIONING_JOIN_CONDITION = new SqlFunction( Review comment: We want and need to use Calcite's optimisations for joins (like pushing down predicate, column pruning, predicate pushdown etc). To enable that, `LogicalVersionedJoin` extends calcite's `Join`. However there is no way to expand `Join`'s semantic, that beside actual join condition we need some extra fields to handle versioning. The only way to do this, is to expose `leftVersionExpression`, `rightVersionExpression` and `rightPrimaryKeyExpression` via this `VERSIONING_JOIN_CONDITION`. Otherwise, if we kept those expressions on some hidden state, they could be pruned/renamed/reordered/... . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208833159 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptRule.{any, none, operand, some} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableFunctionScan +import org.apache.calcite.rel.logical.LogicalCorrelate +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlCastFunction +import org.apache.flink.table.api.{Table, ValidationException} +import org.apache.flink.table.functions.TableVersionFunction +import org.apache.flink.table.functions.sql.ProctimeSqlFunction +import org.apache.flink.table.functions.utils.TableSqlFunction +import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin +import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType +import org.apache.flink.table.plan.util.RexDefaultVisitor +import org.apache.flink.table.typeutils.TypeStringUtils +import org.apache.flink.util.Preconditions.checkState + +class LogicalCorrelateToVersionedJoinRule + extends RelOptRule( +operand(classOf[LogicalCorrelate], + some( +operand(classOf[RelNode], any()), +operand(classOf[TableFunctionScan], none(, +"LogicalCorrelateToVersionedJoinRule") { + + override def onMatch(call: RelOptRuleCall): Unit = { +val logicalCorrelate: LogicalCorrelate = call.rel(0) +val leftNode: RelNode = call.rel(1) +val rightTableFunctionScan: TableFunctionScan = call.rel(2) + +val cluster = logicalCorrelate.getCluster + +new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode) + .visit(rightTableFunctionScan.getCall) match { + case Some(rightVersionedTableCall) => +val underlyingTableHistory: Table = rightVersionedTableCall.tableVersionFunction.table +val relBuilder = this.relBuilderFactory.create( + cluster, + underlyingTableHistory.relBuilder.getRelOptSchema) +val rexBuilder = cluster.getRexBuilder + +val rightNode: RelNode = underlyingTableHistory.logicalPlan.toRelNode(relBuilder) + +val rightVersionExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.versionField) + +val rightPrimaryKeyExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.primaryKey) + +relBuilder.push( + if (rightVersionedTableCall.tableVersionFunction.isProctime) { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightPrimaryKeyExpression) + } + else { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightVersionExpression, + rightPrimaryKeyExpression) + }) +call.transformTo(relBuilder.build()) + case None => +} + } + + private def createRightExpression( + rexBuilder: RexBuilder, + leftNode: RelNode, + rightNode: RelNode, + field: String): RexNode = { +val rightReferencesOffset = leftNode.getRowType.getFieldCount +val rightDataTypeField = rightNode.getRowType.getField(field, false, false) +rexBuilder.makeInputRef( + rightDataTypeField.getType, rightReferencesOffset + rightDataTypeField.getIndex) + } +} +
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208833101 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptRule.{any, none, operand, some} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableFunctionScan +import org.apache.calcite.rel.logical.LogicalCorrelate +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlCastFunction +import org.apache.flink.table.api.{Table, ValidationException} +import org.apache.flink.table.functions.TableVersionFunction +import org.apache.flink.table.functions.sql.ProctimeSqlFunction +import org.apache.flink.table.functions.utils.TableSqlFunction +import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin +import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType +import org.apache.flink.table.plan.util.RexDefaultVisitor +import org.apache.flink.table.typeutils.TypeStringUtils +import org.apache.flink.util.Preconditions.checkState + +class LogicalCorrelateToVersionedJoinRule + extends RelOptRule( +operand(classOf[LogicalCorrelate], + some( +operand(classOf[RelNode], any()), +operand(classOf[TableFunctionScan], none(, +"LogicalCorrelateToVersionedJoinRule") { + + override def onMatch(call: RelOptRuleCall): Unit = { +val logicalCorrelate: LogicalCorrelate = call.rel(0) +val leftNode: RelNode = call.rel(1) +val rightTableFunctionScan: TableFunctionScan = call.rel(2) + +val cluster = logicalCorrelate.getCluster + +new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode) + .visit(rightTableFunctionScan.getCall) match { + case Some(rightVersionedTableCall) => +val underlyingTableHistory: Table = rightVersionedTableCall.tableVersionFunction.table +val relBuilder = this.relBuilderFactory.create( + cluster, + underlyingTableHistory.relBuilder.getRelOptSchema) +val rexBuilder = cluster.getRexBuilder + +val rightNode: RelNode = underlyingTableHistory.logicalPlan.toRelNode(relBuilder) + +val rightVersionExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.versionField) + +val rightPrimaryKeyExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.primaryKey) + +relBuilder.push( + if (rightVersionedTableCall.tableVersionFunction.isProctime) { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightPrimaryKeyExpression) + } + else { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightVersionExpression, + rightPrimaryKeyExpression) + }) +call.transformTo(relBuilder.build()) + case None => +} + } + + private def createRightExpression( + rexBuilder: RexBuilder, + leftNode: RelNode, + rightNode: RelNode, + field: String): RexNode = { +val rightReferencesOffset = leftNode.getRowType.getFieldCount +val rightDataTypeField = rightNode.getRowType.getField(field, false, false) +rexBuilder.makeInputRef( + rightDataTypeField.getType, rightReferencesOffset + rightDataTypeField.getIndex) + } +} +
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208835846 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala ## @@ -167,28 +167,28 @@ class JoinHarnessTest extends HarnessTestBase { testHarness.setProcessingTime(1) testHarness.processElement1(new StreamRecord( - CRow(Row.of(1L: JLong, "1a1"), change = true), 1)) + CRow(1L: JLong, "1a1"), 1)) Review comment: Again, I'm depending on this change in the following commit. What harm does it do here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208835295 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala ## @@ -103,27 +97,42 @@ class DataStreamJoin( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig): DataStream[CRow] = { -val config = tableEnv.getConfig -val returnType = schema.typeInfo -val keyPairs = joinInfo.pairs().toList +validateKeyTypes() -// get the equality keys -val leftKeys = ArrayBuffer.empty[Int] -val rightKeys = ArrayBuffer.empty[Int] +val leftDataStream = Review comment: I would be against that. I'm depending on this change in this PR, so it defines strict order of those two commits. Either I wouldn't publish the actual versioned joins PR after this commit is merged, or the versioned joins PR would look exactly the same as it looks now. Splitting into two PRs adds a lot of overhead and room for a lot of problems with PRs going out of sync. During reviewing it's also annoying, because you see the same code twice and reviewers (same or different ones) are often (almost always?) making half of the comments in one PR and half in the other. Besides, this is already in a separate commit so it can be reviewed separately. Maybe instead of reviewing PR all at once try reviewing it commit by commit? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208829502 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -155,6 +158,90 @@ class Table( select(withResolvedAggFunctionCall: _*) } + /** +* Creates table version function backed up by this table as a history table. +* +* @param version field for the [[TableVersionFunction]]. Must points to a time indicator +* @param primaryKey field for the [[TableVersionFunction]]. +* @return [[TableVersionFunction]] +*/ + def createTableVersionFunction( + version: String, + primaryKey: String): TableVersionFunction = { Review comment: In first version me and @fhueske didn't want to focus on this and we agreed to do this as a follow up task. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr opened a new pull request #6528: [FLINK-10107] [e2e] Exclude conflicting SQL JARs from test
twalthr opened a new pull request #6528: [FLINK-10107] [e2e] Exclude conflicting SQL JARs from test URL: https://github.com/apache/flink/pull/6528 ## What is the purpose of the change This is a temporary solution for fixing the SQL Client end-to-end test for releases. ## Brief change log - Do not include conflicting SQL JARs in library folder of test. ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys commented on a change in pull request #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test
dawidwys commented on a change in pull request #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test URL: https://github.com/apache/flink/pull/6529#discussion_r208860682 ## File path: flink-end-to-end-tests/test-scripts/test-runner-common.sh ## @@ -29,12 +29,14 @@ function run_test { description="$1" command="$2" +name=$(basename "${command}" | sed -e 's/[_ ]/-/g' | sed -n 's/\.sh//p') Review comment: I chose the script name cause the descriptions right now are rather long, usually with common prefix, plus some of them are still same for different test runs e.g. : `Local recovery and sticky scheduling end-to-end test`. The command contains script parameters as well. So it should be even easier to differentiate runs this way rather than using description. Example dir name: `temp-test-resume-savepoint-2-2-file-true-34211378012` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann opened a new pull request #6531: [FLINK-10110][tests] Harden Kafka component shut down for E2E tests
tillrohrmann opened a new pull request #6531: [FLINK-10110][tests] Harden Kafka component shut down for E2E tests URL: https://github.com/apache/flink/pull/6531 ## What is the purpose of the change Instead of only calling kafka-server-stop.sh and zookeeper-server-stop.sh which can fail due to KAFKA-4931 we also use jps to kill the Kafka and ZooKeeper processes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rickhofstede commented on issue #6503: [FLINK-10072] [docs] Syntax and consistency issues in "The Broadcast State Pattern"
rickhofstede commented on issue #6503: [FLINK-10072] [docs] Syntax and consistency issues in "The Broadcast State Pattern" URL: https://github.com/apache/flink/pull/6503#issuecomment-411703560 @yanghua Since you've reviewed my commits, let me please ask you the following. It appears that Travis doesn't like the last two commits in this pull request. This seems odd to me, since they introduce no functional changes to the code at all. Can you comment on what is going wrong there, such that I can fix it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske opened a new pull request #6527: [hotfix] [docs] Fix ProcessWindowFunction code snippets.
fhueske opened a new pull request #6527: [hotfix] [docs] Fix ProcessWindowFunction code snippets. URL: https://github.com/apache/flink/pull/6527 ## What is the purpose of the change - Fix the code snippets for the `ProcessWindowFunction` in the documentation ## Brief change log Problems with the examples: - Java: Tuple does not take generic parameters (`Tuple`) - Java: `process()` method must be `public` - `MyProcessWindowFunction` expects `String` key, but no key extractor is used - `MyProcessWindowFunction` expects `TimeWindow`, but window assigner is not defined ## Verifying this change copy and paste code in an IDE. ## Does this pull request potentially affect one of the following parts: No ## Documentation Fix for the docs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys commented on a change in pull request #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test
dawidwys commented on a change in pull request #6529: [FLINK-10106][e2e] Included test name in temp directory of e2e test URL: https://github.com/apache/flink/pull/6529#discussion_r208860682 ## File path: flink-end-to-end-tests/test-scripts/test-runner-common.sh ## @@ -29,12 +29,14 @@ function run_test { description="$1" command="$2" +name=$(basename "${command}" | sed -e 's/[_ ]/-/g' | sed -n 's/\.sh//p') Review comment: I chose the script name cause the descriptions right now are rather long, usually with long common prefix, plus some of them are still same for different test runs e.g. : `Local recovery and sticky scheduling end-to-end test`. The command contains script parameters as well. So it should be even easier to differentiate runs this way rather than using description. Example dir name: `temp-test-resume-savepoint-2-2-file-true-34211378012` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann opened a new pull request #6530: [FLINK-10110][tests] Harden Kafka component shut down for E2E tests
tillrohrmann opened a new pull request #6530: [FLINK-10110][tests] Harden Kafka component shut down for E2E tests URL: https://github.com/apache/flink/pull/6530 ## What is the purpose of the change Instead of only calling kafka-server-stop.sh and zookeeper-server-stop.sh which can fail due to KAFKA-4931 we also use jps to kill the Kafka and ZooKeeper processes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tweise commented on issue #6482: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards.
tweise commented on issue #6482: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards. URL: https://github.com/apache/flink/pull/6482#issuecomment-411622174 @tzulitai PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos.
yanghua commented on a change in pull request #6522: [FLINK-10101][mesos] Add web ui url for mesos. URL: https://github.com/apache/flink/pull/6522#discussion_r208818211 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -120,6 +120,9 @@ /** A local actor system for using the helper actors. */ private final ActorSystem actorSystem; + /** Web url for to show in mesos page.*/ + @Nullable private final String webUiUrl; Review comment: for field, I recommend `@Nullable` to move up one line This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maqingxiang commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8
maqingxiang commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r208790526 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: Hi @azagrebin, group.id and client.id are two separate concepts, it has different meanings. I mean that there's no real meaning to the name of the client id as "flink-kafka-consumer-legacy-" + broker.id(). As different topics consumed by different consumers are all named with this value, it is impossible to distinguish and difficult to troubleshoot. I would suggest to open the client id to the user, and its default value set "group.id value". thanks. Cheers This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on issue #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit
TisonKun commented on issue #6490: [FLINK-10056] [test] Add JobMasterTest#testRequestNextInputSplit URL: https://github.com/apache/flink/pull/6490#issuecomment-411627844 cc @aljoscha @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on issue #6345: [FLINK-9869][runtime] Send PartitionInfo in batch to Improve perfornance
TisonKun commented on issue #6345: [FLINK-9869][runtime] Send PartitionInfo in batch to Improve perfornance URL: https://github.com/apache/flink/pull/6345#issuecomment-411627672 cc @GJL @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua edited a comment on issue #6503: [FLINK-10072] [docs] Syntax and consistency issues in "The Broadcast State Pattern"
yanghua edited a comment on issue #6503: [FLINK-10072] [docs] Syntax and consistency issues in "The Broadcast State Pattern" URL: https://github.com/apache/flink/pull/6503#issuecomment-411724977 hi @rickhofstede , Travis build error is not caused by your code. So, take it easy. The next time you encounter a problem, you can follow the steps below to deal with it: First, click on the travis failure link to view the failure log. Then, analyze what is the cause of the failure and whether it is caused by you. If it is, then fix it, if not you can go to JIRA to report the issue. I have reported an issue for this exception, see FLINK-10111. You can also push an empty commit to make travis rebuild. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call
azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call URL: https://github.com/apache/flink/pull/5996#discussion_r208905291 ## File path: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncAPIExample.scala ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.async + + +import java.io.{BufferedReader, InputStreamReader} +import java.util.concurrent.TimeUnit + +import org.apache.flink.runtime.concurrent.Executors +import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment, _} +import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture} +import org.apache.http.client.methods.HttpGet +import org.apache.http.impl.client.HttpClientBuilder + +import scala.concurrent.{ExecutionContext, Future} + +object AsyncAPIExample { + + def main(args: Array[String]) { + +val env = StreamExecutionEnvironment.getExecutionEnvironment + +val input : DataStream[Int] = env.addSource(new SimpleSource()) + +val quoteStream: DataStream[(Int, String)] = AsyncDataStream.unorderedWait( + input, + new AsyncQuoteRequest, + 1000, + TimeUnit.MILLISECONDS, + 10) + +quoteStream.print() + +env.execute("Async API job") + } +} + +class AsyncQuoteRequest extends AsyncFunction[Int, (Int, String)] { + + /** The API specific client that can issue concurrent requests with callbacks */ + + /** The context used for the future callbacks */ + implicit lazy val executor: ExecutionContext = ExecutionContext.global + + lazy val client = new Quote() + + override def asyncInvoke(input: Int, resultFuture: ResultFuture[(Int, String)]): Unit = { + + +// issue the asynchronous request, receive a future for the result +val resultFutureRequested: Future[String] = Future { + client.getQuote(input.toString) +} + +// set the callback to be executed once the request by the client is complete +// the callback simply forwards the result to the result future +resultFutureRequested.onSuccess { + case result: String => { +resultFuture.complete(Iterable((input, result))) + } +} + } + + +} + +class Quote { + @throws[Exception] + def getQuote(number: String) : String = { +val url = "http://gturnquist-quoters.cfapps.io/api/; + number +val client = HttpClientBuilder.create.build +val request = new HttpGet(url) +val response = client.execute(request) +val rd = new BufferedReader(new InputStreamReader(response.getEntity.getContent)) +rd.readLine Review comment: I think it would be nice to close `client` before returning the result This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call
azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call URL: https://github.com/apache/flink/pull/5996#discussion_r208906401 ## File path: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncAPIExample.scala ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.async + + +import java.io.{BufferedReader, InputStreamReader} +import java.util.concurrent.TimeUnit + +import org.apache.flink.runtime.concurrent.Executors +import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment, _} +import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture} +import org.apache.http.client.methods.HttpGet +import org.apache.http.impl.client.HttpClientBuilder + +import scala.concurrent.{ExecutionContext, Future} + +object AsyncAPIExample { + + def main(args: Array[String]) { + +val env = StreamExecutionEnvironment.getExecutionEnvironment + +val input : DataStream[Int] = env.addSource(new SimpleSource()) + +val quoteStream: DataStream[(Int, String)] = AsyncDataStream.unorderedWait( + input, + new AsyncQuoteRequest, + 1000, + TimeUnit.MILLISECONDS, + 10) + +quoteStream.print() + +env.execute("Async API job") + } +} + +class AsyncQuoteRequest extends AsyncFunction[Int, (Int, String)] { + + /** The API specific client that can issue concurrent requests with callbacks */ + + /** The context used for the future callbacks */ + implicit lazy val executor: ExecutionContext = ExecutionContext.global + + lazy val client = new Quote() + + override def asyncInvoke(input: Int, resultFuture: ResultFuture[(Int, String)]): Unit = { + + +// issue the asynchronous request, receive a future for the result +val resultFutureRequested: Future[String] = Future { + client.getQuote(input.toString) +} + +// set the callback to be executed once the request by the client is complete +// the callback simply forwards the result to the result future +resultFutureRequested.onSuccess { + case result: String => { +resultFuture.complete(Iterable((input, result))) + } +} + } + + +} + +class Quote { Review comment: `QuoteHttpClient`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call
azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call URL: https://github.com/apache/flink/pull/5996#discussion_r208905065 ## File path: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncAPIExample.scala ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.async + + +import java.io.{BufferedReader, InputStreamReader} +import java.util.concurrent.TimeUnit + +import org.apache.flink.runtime.concurrent.Executors +import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment, _} +import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture} +import org.apache.http.client.methods.HttpGet +import org.apache.http.impl.client.HttpClientBuilder + +import scala.concurrent.{ExecutionContext, Future} + +object AsyncAPIExample { + + def main(args: Array[String]) { + +val env = StreamExecutionEnvironment.getExecutionEnvironment + +val input : DataStream[Int] = env.addSource(new SimpleSource()) + +val quoteStream: DataStream[(Int, String)] = AsyncDataStream.unorderedWait( + input, + new AsyncQuoteRequest, + 1000, + TimeUnit.MILLISECONDS, + 10) + +quoteStream.print() + +env.execute("Async API job") + } +} + +class AsyncQuoteRequest extends AsyncFunction[Int, (Int, String)] { + + /** The API specific client that can issue concurrent requests with callbacks */ + + /** The context used for the future callbacks */ + implicit lazy val executor: ExecutionContext = ExecutionContext.global + + lazy val client = new Quote() + + override def asyncInvoke(input: Int, resultFuture: ResultFuture[(Int, String)]): Unit = { + + +// issue the asynchronous request, receive a future for the result +val resultFutureRequested: Future[String] = Future { + client.getQuote(input.toString) +} Review comment: It can be a bit shorter as in `AsyncIOExample`: ``` Future { val result = client.getQuote(input.toString) resultFuture.complete(Iterable((input, result))) } (ExecutionContext.global) ``` then no need for `implicit lazy val executor` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call
azagrebin commented on a change in pull request #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call URL: https://github.com/apache/flink/pull/5996#discussion_r208904205 ## File path: flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncAPIExample.scala ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.async + + +import java.io.{BufferedReader, InputStreamReader} +import java.util.concurrent.TimeUnit + +import org.apache.flink.runtime.concurrent.Executors +import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment, _} +import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture} +import org.apache.http.client.methods.HttpGet +import org.apache.http.impl.client.HttpClientBuilder + +import scala.concurrent.{ExecutionContext, Future} + +object AsyncAPIExample { Review comment: I would suggest to name it `AsyncHttpCallExample` or `AsyncRestAPIExample` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6503: [FLINK-10072] [docs] Syntax and consistency issues in "The Broadcast State Pattern"
yanghua commented on issue #6503: [FLINK-10072] [docs] Syntax and consistency issues in "The Broadcast State Pattern" URL: https://github.com/apache/flink/pull/6503#issuecomment-411724977 hi @rickhofstede , Travis build error is not caused by your code. So, take it easy. The next time you encounter a problem, you can follow the steps below to deal with it: First, click on the travis failure link to view the failure log. Then, analyze what is the cause of the failure and whether it is caused by you. If it is, then fix it, if not you can go to JIRA to report the issue. I have reported an issue for this exception, see FLINK-10111. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys commented on issue #6511: [FLINK-10085][tests] Update AbstractOperatorRestoreTestBase for 1.5
dawidwys commented on issue #6511: [FLINK-10085][tests] Update AbstractOperatorRestoreTestBase for 1.5 URL: https://github.com/apache/flink/pull/6511#issuecomment-411749740 Do we have any reasonable way to verify those kinds of PRs? Or shall we just look for typos while reviewing it? If the latter than +1. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann closed pull request #5920: [FLINK-9231] [web] Enable SO_REUSEADDR on listen sockets for WebFront…
tillrohrmann closed pull request #5920: [FLINK-9231] [web] Enable SO_REUSEADDR on listen sockets for WebFront… URL: https://github.com/apache/flink/pull/5920 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java index 740beaee1cb..3d8e36b4f3f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java @@ -29,6 +29,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.Channel; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; @@ -103,7 +104,8 @@ protected void initChannel(SocketChannel ch) { this.bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) - .childHandler(initializer); + .childHandler(initializer) + .option(ChannelOption.SO_REUSEADDR, true); ChannelFuture ch; if (configuredAddress == null) { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on issue #5920: [FLINK-9231] [web] Enable SO_REUSEADDR on listen sockets for WebFront…
tillrohrmann commented on issue #5920: [FLINK-9231] [web] Enable SO_REUSEADDR on listen sockets for WebFront… URL: https://github.com/apache/flink/pull/5920#issuecomment-411731732 I will close this PR since it seems to be inactive. If you still want to work on it @trionesadam please reopen it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on issue #6379: [FLINK-9637] Add public user documentation for state TTL feature
twalthr commented on issue #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379#issuecomment-411756912 Thank you @azagrebin. I will go through the text a last time and merge this... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #6528: [FLINK-10107] [e2e] Exclude conflicting SQL JARs from test
asfgit closed pull request #6528: [FLINK-10107] [e2e] Exclude conflicting SQL JARs from test URL: https://github.com/apache/flink/pull/6528 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml index ec5a0e10360..6e69568939f 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml +++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml @@ -153,13 +153,14 @@ under the License. sql-jar jar + org.apache.flink flink-connector-kafka-0.10_${scala.binary.version} @@ -167,13 +168,14 @@ under the License. sql-jar jar + This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] medcv commented on issue #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call
medcv commented on issue #5996: [FLINK-9343] [Example] Add Async Example with External Rest API call URL: https://github.com/apache/flink/pull/5996#issuecomment-411780022 @azagrebin Thanks for the review. I will go through them and update the PR accordingly This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208952554 ## File path: docs/ops/state/large_state_tuning.md ## @@ -142,6 +142,17 @@ by default. To enable this feature, users can instantiate a `RocksDBStateBackend new RocksDBStateBackend(filebackend, true); {% endhighlight %} +**RocksDB Timers** + +For RocksDB, user can chose whether timers are stored on the heap (default) or inside RocksDB. Heap-based timers can have a better performance for smaller amounts of +timers, while storing timers inside RocksDB offers higher scalability as the amount of timers in RocksDB can exceed the available main memory (spilling to disk). + +When using RockDB as state backend, the type of timer storage can be selected through Flink's configuration via option key `RocksDBOptions.TIMER_SERVICE_FACTORY`. Review comment: The option classes are mostly internal, right? So a proper YAML would be more useful I guess. Can I only set this property for an entire Flink cluster or also per-job? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208951393 ## File path: docs/ops/state/large_state_tuning.md ## @@ -142,6 +142,17 @@ by default. To enable this feature, users can instantiate a `RocksDBStateBackend new RocksDBStateBackend(filebackend, true); {% endhighlight %} +**RocksDB Timers** + +For RocksDB, user can chose whether timers are stored on the heap (default) or inside RocksDB. Heap-based timers can have a better performance for smaller amounts of Review comment: "a user" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208950482 ## File path: docs/dev/stream/operators/process_function.md ## @@ -333,3 +332,43 @@ ctx.timerService.registerEventTimeTimer(coalescedTime) {% endhighlight %} + +Timers can also be stopped and removed as follows: + +Stopping a processing-time timer: + + + +{% highlight java %} +long timestampOfTimerToStop = ... +ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop); Review comment: Are there any performance implications of deleting a timer? I guess not anymore right? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8
azagrebin commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r208965558 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: @maqingxiang Now I see your point, I think it makes sense to have `group.id` as a default value for `client.id` but keep them separately configurable, e.g.: ``` String groupId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); this.clientId = config.getProperty("client.id", groupId); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api
yanghua commented on issue #6266: [FLINK-9682] Add setDescription to execution environment and provide description field for the rest api URL: https://github.com/apache/flink/pull/6266#issuecomment-411806613 cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] NicoK commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe…
NicoK commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe… URL: https://github.com/apache/flink/pull/6294#discussion_r208962025 ## File path: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java ## @@ -34,9 +36,22 @@ protected void formatLineBreak(StringBuilder state) { } @Override - protected void formatText(StringBuilder state, String format, String[] elements) { + protected void formatText( + StringBuilder state, + String format, + String[] elements, + EnumSet styles) { String escapedFormat = escapeCharacters(format); + + StringBuilder prefix = new StringBuilder(); + StringBuilder sufix = new StringBuilder(); + if (styles.contains(TextElement.TextStyle.CODE)) { + prefix.append(""); + sufix.append(""); + } Review comment: why use a `StringBuilder` for the `prefix`/`suffix` (also there is a typo in suffix)? They only ever contain a single string. This could rather be ``` String prefix = ""; String suffix = ""; if (styles.contains(TextElement.TextStyle.CODE)) { prefix = ""; sufix = ""; } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe…
dawidwys commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe… URL: https://github.com/apache/flink/pull/6294#discussion_r208969123 ## File path: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java ## @@ -34,9 +36,22 @@ protected void formatLineBreak(StringBuilder state) { } @Override - protected void formatText(StringBuilder state, String format, String[] elements) { + protected void formatText( + StringBuilder state, + String format, + String[] elements, + EnumSet styles) { String escapedFormat = escapeCharacters(format); + + StringBuilder prefix = new StringBuilder(); + StringBuilder sufix = new StringBuilder(); + if (styles.contains(TextElement.TextStyle.CODE)) { + prefix.append(""); + sufix.append(""); + } Review comment: That would make sense if we had more TextStyles, but as we don't have them I've changed it to your code snippet. Also fixed the typo. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe…
GJL commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe… URL: https://github.com/apache/flink/pull/6294#discussion_r208976465 ## File path: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java ## @@ -34,9 +36,22 @@ protected void formatLineBreak(StringBuilder state) { } @Override - protected void formatText(StringBuilder state, String format, String[] elements) { + protected void formatText( + StringBuilder state, + String format, + String[] elements, + EnumSet styles) { String escapedFormat = escapeCharacters(format); + + String prefix = ""; + String suffix = ""; + if (styles.contains(TextElement.TextStyle.CODE)) { + prefix = ""; Review comment: `` is not part of the HTML5 standard: https://developer.mozilla.org/en-US/docs/Web/HTML/Element/tt This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature
asfgit closed pull request #6379: [FLINK-9637] Add public user documentation for state TTL feature URL: https://github.com/apache/flink/pull/6379 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md index 44a3653a61f..72fca3e27f1 100644 --- a/docs/dev/stream/state/state.md +++ b/docs/dev/stream/state/state.md @@ -266,6 +266,132 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. +### State time-to-live (TTL) + +A time-to-live (TTL) can be assigned to the keyed state of any type. +In this case it will expire after the configured TTL +and its stored value will be cleaned up on the best effort basis which is discussed in details later. + +The state collection types support per-entry TTLs: list elements and map entries expire independently. + +To use state TTL you must first build a `StateTtlConfig` object, +then TTL functionality can be enabled in any state descriptor passing this configuration: + + + +{% highlight java %} +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.state.ValueStateDescriptor; + +StateTtlConfig ttlConfig = StateTtlConfig +.newBuilder(Time.seconds(1)) +.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) +.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) +.build(); + +ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("text state", String.class); +stateDescriptor.enableTimeToLive(ttlConfig); +{% endhighlight %} + + + +{% highlight scala %} +import org.apache.flink.api.common.state.StateTtlConfig +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.state.ValueStateDescriptor + +val ttlConfig = StateTtlConfig +.newBuilder(Time.seconds(1)) +.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) +.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) +.build + +val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String]) +stateDescriptor.enableTimeToLive(ttlConfig) +{% endhighlight %} + + + +The configuration has several options to consider. +The first parameter of `newBuilder` method is mandatory, it is the time-to-live value. + +The update type configures when the state TTL is refreshed (default `OnCreateAndWrite`): + + - `StateTtlConfig.UpdateType.OnCreateAndWrite` - only on creation and write access, + - `StateTtlConfig.UpdateType.OnReadAndWrite` - also on read access. + +The state visibility configures whether the expired value is returned on read access +if it is not cleaned up yet (default `NeverReturnExpired`): + + - `StateTtlConfig.StateVisibility.NeverReturnExpired` - expired value is never returned, + - `StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp` - returned if still available. + + In case of `NeverReturnExpired`, the expired state behaves as if it does not exist anymore, + even if it has yet to be removed. The option can be useful for the use cases + where data has to become unavailable for read access strictly after TTL, + e.g. application working with privacy sensitive data. + +Another option `ReturnExpiredIfNotCleanedUp` allows to return the expired state before its cleanup. + +**Notes:** + +- The state backends store the timestamp of last modification along with the user value, +which means that enabling this feature increases consumption of state storage. +Heap state backend stores an additional Java object with a reference to the user state object +and a primitive long value in memory. The RocksDB state backend adds 8 bytes per stored value, list entry or map entry. + +- Only TTLs in reference to *processing time* are currently supported. + +- Trying to restore state, which was previously configured without TTL, using TTL enabled descriptor or vice versa +will lead to compatibility failure and `StateMigrationException`. + +- The TTL configuration is not part of check- or savepoints +but rather a way how Flink treats it in the currently running job. + + Cleanup of expired state + +Currently expired values are always removed when they are read out explicitly, +e.g. by calling `ValueState.value()`. + +Attention! This means that by default if expired state is not read, +it won't be removed, possibly leading to ever growing state. This might change in future releases. + +Additionally you can activate the cleanup at the moment of taking the full state snapshot which +will reduce its
[GitHub] dawidwys commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process
dawidwys commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#issuecomment-411788642 Hi @Aitozi I found the dependencies between accessor and shared buffer a bit too much both sided. Please have a look at my idea how would I loosen it a bit in this branch: https://github.com/dawidwys/flink/tree/pr/6205. It does not compile, but you should get the idea how would I split up those two classes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe…
GJL commented on a change in pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores only being effective whe… URL: https://github.com/apache/flink/pull/6294#discussion_r208986466 ## File path: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java ## @@ -34,9 +36,22 @@ protected void formatLineBreak(StringBuilder state) { } @Override - protected void formatText(StringBuilder state, String format, String[] elements) { + protected void formatText( + StringBuilder state, + String format, + String[] elements, + EnumSet styles) { String escapedFormat = escapeCharacters(format); + + String prefix = ""; + String suffix = ""; + if (styles.contains(TextElement.TextStyle.CODE)) { + prefix = ""; Review comment: What I don't like is that `` or `` might not get formatted the same way as markdown's backticks. Why don't we enable markdown syntax in HTML tags. There is a configuration option in Kramdown, the default HTML converter in Jekyll: > By default, kramdown parses all block HTML tags and all XML tags as raw HTML blocks. However, this can be configured with the parse_block_html. If this is set to true, then syntax parsing in HTML blocks is globally enabled. It is also possible to enable/disable syntax parsing on a tag per tag basis using the markdown attribute: > >If an HTML tag has an attribute markdown="0", then the tag is parsed as raw HTML block. > >If an HTML tag has an attribute markdown="1", then the default mechanism for parsing syntax in this tag is used. > >If an HTML tag has an attribute markdown="block", then the content of the tag is parsed as block level elements. > >If an HTML tag has an attribute markdown="span", then the content of the tag is parsed as span level elements. https://kramdown.gettalong.org/syntax.html This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6532: [FLINK-10109] Add documentation for StreamingFileSink
twalthr commented on a change in pull request #6532: [FLINK-10109] Add documentation for StreamingFileSink URL: https://github.com/apache/flink/pull/6532#discussion_r208986029 ## File path: docs/dev/connectors/streamfile_sink.md ## @@ -0,0 +1,95 @@ +--- +title: "Streaming File Sink" +nav-title: Streaming File Sink +nav-parent_id: connectors +nav-pos: 5 +--- + + +This connector provides a Sink that writes partitioned files to filesystems +supported by the Flink `FileSystem` abstraction. Since in streaming the input +is potentially infinite, the streaming file sink writes data into buckets. The +bucketing behaviour is configurable but a useful default is time-based +bucketing where we start writing a new bucket every hour and thus get +individual files that each contain a part of the infinite output stream. + +Within a bucket, we further split the output into smaller part files based on a +rolling policy. This is useful to prevent individual bucket files from getting +too big. This is also configurable but the default policy rolls files based on +file size and a timeout, i.e if no new data was written to a part file. + + Usage + +The only required configuration are the base path were we want to output our +data and an +[Encoder](http://flink.apache.org/docs/latest/api/java/org/apache/flink/api/common/serialization/Encoder.html) +that is used for serializing records to the `OutputStream` for each file. + +Basic usage thus looks like this: + + + + +{% highlight java %} +DataStream input = ...; + +final StreamingFileSink sink = StreamingFileSink Review comment: Please add imports to code examples. I try to do this recently to avoid user confusion. Where is `StreamingFileSink` located, which `Path` are we using, etc.. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6532: [FLINK-10109] Add documentation for StreamingFileSink
twalthr commented on a change in pull request #6532: [FLINK-10109] Add documentation for StreamingFileSink URL: https://github.com/apache/flink/pull/6532#discussion_r208985527 ## File path: docs/dev/connectors/streamfile_sink.md ## @@ -0,0 +1,95 @@ +--- +title: "Streaming File Sink" +nav-title: Streaming File Sink +nav-parent_id: connectors +nav-pos: 5 +--- + + +This connector provides a Sink that writes partitioned files to filesystems +supported by the Flink `FileSystem` abstraction. Since in streaming the input +is potentially infinite, the streaming file sink writes data into buckets. The +bucketing behaviour is configurable but a useful default is time-based +bucketing where we start writing a new bucket every hour and thus get +individual files that each contain a part of the infinite output stream. + +Within a bucket, we further split the output into smaller part files based on a +rolling policy. This is useful to prevent individual bucket files from getting +too big. This is also configurable but the default policy rolls files based on +file size and a timeout, i.e if no new data was written to a part file. + + Usage + +The only required configuration are the base path were we want to output our +data and an +[Encoder](http://flink.apache.org/docs/latest/api/java/org/apache/flink/api/common/serialization/Encoder.html) Review comment: We should link to version-specific class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208952984 ## File path: docs/ops/state/large_state_tuning.md ## @@ -142,6 +142,17 @@ by default. To enable this feature, users can instantiate a `RocksDBStateBackend new RocksDBStateBackend(filebackend, true); {% endhighlight %} +**RocksDB Timers** + +For RocksDB, user can chose whether timers are stored on the heap (default) or inside RocksDB. Heap-based timers can have a better performance for smaller amounts of +timers, while storing timers inside RocksDB offers higher scalability as the amount of timers in RocksDB can exceed the available main memory (spilling to disk). + +When using RockDB as state backend, the type of timer storage can be selected through Flink's configuration via option key `RocksDBOptions.TIMER_SERVICE_FACTORY`. +Possible choices are `HEAP` (to store timers on the heap, default) and `ROCKSDB` (to store timers in RocksDB). Review comment: Usually all YAML values are lower case. These are upper case? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208950851 ## File path: docs/dev/stream/operators/process_function.md ## @@ -333,3 +332,43 @@ ctx.timerService.registerEventTimeTimer(coalescedTime) {% endhighlight %} + +Timers can also be stopped and removed as follows: + +Stopping a processing-time timer: + + + +{% highlight java %} +long timestampOfTimerToStop = ... +ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop); +{% endhighlight %} + + + +{% highlight scala %} +val timestampOfTimerToStop = ... +ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop) +{% endhighlight %} + + + +Stopping an event-time timer: + + + +{% highlight java %} +long timestampOfTimerToStop = ... +ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop); +{% endhighlight %} + + + +{% highlight scala %} +val timestampOfTimerToStop = ... +ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop) +{% endhighlight %} + + + +Note Stopping a timer has no effect if no such timer timer with the given timestamp is registered. Review comment: remove duplicate `timer` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208950079 ## File path: docs/dev/stream/operators/process_function.md ## @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. -**Note:** Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. +Note Flink synchronizes invocations of `onTimer()` and `processElement()`. Hence, users do not have to worry about concurrent modification of state. ### Fault Tolerance Timers are fault tolerant and checkpointed along with the state of the application. In case of a failure recovery or when starting an application from a savepoint, the timers are restored. -**Note:** Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. +Note Checkpointed processing-time timers that were supposed to fire before their restoration, will fire immediately. This might happen when an application recovers from a failure or when it is started from a savepoint. -**Note:** Timers are always synchronously checkpointed, regardless of the configuration of the state backends. -Therefore, a large number of timers can significantly increase checkpointing time. -See the "Timer Coalescing" section for advice on how to reduce the number of timers. +Note Timers are always asynchronously checkpointed, except for the combination of RocksDB backend / with incremental snapshots / with heap-based timers (will be resolved with `FLINK-10026`). Review comment: It would be great if you could add more internal knowledge if there is more to say. This section is very tiny but I guess a lot of work went it to so we should document it accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st…
twalthr commented on a change in pull request #6504: [FLINK-10068][docs] Add documentation for RocksDB-based timers and st… URL: https://github.com/apache/flink/pull/6504#discussion_r208949265 ## File path: docs/dev/stream/operators/process_function.md ## @@ -277,19 +277,18 @@ Both types of timers (processing-time and event-time) are internally maintained The `TimerService` deduplicates timers per key and timestamp, i.e., there is at most one timer per key and timestamp. If multiple timers are registered for the same timestamp, the `onTimer()` method will be called just once. Review comment: Can we move this entire `Fault Tolerance` section to `dev/stream/state/checkpointing.md` for a new `Timers` section? This is a general concept that is not only relevant for `ProcessFunction`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aljoscha commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8
aljoscha commented on a change in pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8 URL: https://github.com/apache/flink/pull/5304#discussion_r208955188 ## File path: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ## @@ -123,6 +124,7 @@ public SimpleConsumerThread( this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); + this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); Review comment: Why is it not ``` this.clientId = config.getProperty("client.id", "flink-kafka-consumer-legacy-" + broker.id()); ``` that way the client ID would be configurable separately from the group ID. As it is in the PR, you can never have a client ID that is different from the group ID if you set a `group.id`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL opened a new pull request #6533: [FLINK-9795][mesos, docs] Update Mesos documentation for FLIP-6
GJL opened a new pull request #6533: [FLINK-9795][mesos, docs] Update Mesos documentation for FLIP-6 URL: https://github.com/apache/flink/pull/6533 ## What is the purpose of the change *Update Mesos documentation to reflect changes that were introduced with FLIP-6.* cc: @tillrohrmann ## Brief change log - *Remove Configuration parameters section.* - *Improve formatting.* - *Document which config options are only used in legacy mode.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** (the feature is the documentation) / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-411806291 cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on issue #6499: [FLINK-10099] [test] Rework YarnResourceManagerTest
tillrohrmann commented on issue #6499: [FLINK-10099] [test] Rework YarnResourceManagerTest URL: https://github.com/apache/flink/pull/6499#issuecomment-411775562 Thanks for addressing our comments @TisonKun. Merging this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xueyumusic commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support
xueyumusic commented on a change in pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support URL: https://github.com/apache/flink/pull/6282#discussion_r208985576 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala ## @@ -340,3 +343,66 @@ case class DateFormat(timestamp: Expression, format: Expression) extends Express override private[flink] def resultType = STRING_TYPE_INFO } + +case class TimestampDiff( +timeIntervalUnit: Expression, +timestamp1: Expression, +timestamp2: Expression) + extends Expression { + + override private[flink] def children: Seq[Expression] = +timeIntervalUnit :: timestamp1 :: timestamp2 :: Nil + + override private[flink] def validateInput(): ValidationResult = { +if (!TypeCheckUtils.isTimePoint(timestamp1.resultType)) { + return ValidationFailure(s"TimestampDiff operator requires Temporal input, " + +s"but timestamp1 is of type ${timestamp1.resultType}") +} + +if (!TypeCheckUtils.isTimePoint(timestamp2.resultType)) { + return ValidationFailure(s"TimestampDiff operator requires Temporal input, " + +s"but timestamp2 is of type ${timestamp2.resultType}") +} + +timeIntervalUnit match { + case SymbolExpression(TimeIntervalUnit.YEAR) + | SymbolExpression(TimeIntervalUnit.MONTH) + | SymbolExpression(TimeIntervalUnit.DAY) + | SymbolExpression(TimeIntervalUnit.HOUR) + | SymbolExpression(TimeIntervalUnit.MINUTE) + | SymbolExpression(TimeIntervalUnit.SECOND) +if timestamp1.resultType == SqlTimeTypeInfo.DATE + || timestamp1.resultType == SqlTimeTypeInfo.TIMESTAMP + || timestamp2.resultType == SqlTimeTypeInfo.DATE + || timestamp2.resultType == SqlTimeTypeInfo.TIMESTAMP => +ValidationSuccess + + case _ => +ValidationFailure(s"TimestampDiff operator does not support unit '$timeIntervalUnit'" + +s" for input of type ('${timestamp1.resultType}', '${timestamp2.resultType}').") +} + } + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +val typeFactory = relBuilder + .asInstanceOf[FlinkRelBuilder] + .getTypeFactory + +val intervalUnit = timeIntervalUnit.asInstanceOf[SymbolExpression].symbol + .enum.asInstanceOf[TimeUnitRange] +val intervalType = typeFactory.createSqlIntervalType( + new SqlIntervalQualifier(intervalUnit.startUnit, intervalUnit.endUnit, SqlParserPos.ZERO)) + +val rexCall = relBuilder + .getRexBuilder + .makeCall(intervalType, SqlStdOperatorTable.MINUS_DATE, Review comment: Thanks @twalthr , it seems that if we call `SqlStdOperatorTable#TIMESTAMP_DIFF` here we need to add new codegen for timestampDiff sqlfunction.. I will look deeply the other feedbacks recently, thank you :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client URL: https://github.com/apache/flink/pull/6539#issuecomment-412244963 Till, scala checkstyle error : ``` error file=/home/travis/build/apache/flink/flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala message=File line length exceeds 100 characters line=88 ``` other travis build task also failed, because of some connection timeout. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r209413346 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala ## @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends UserDefinedFunction { * accumulator type should be automatically inferred. */ def getAccumulatorType: TypeInformation[ACC] = null + + private[flink] var isDistinctAgg: Boolean = false + + private[flink] def distinct: AggregateFunction[T, ACC] = { Review comment: good catch! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process
Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209419821 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -161,40 +100,30 @@ public void init( eventsCount.putAll(maxIds); } - /** -* Stores given value (value + timestamp) under the given state. It assigns a preceding element -* relation to the previous entry. -* -* @param stateName name of the state that the event should be assigned to -* @param eventIdunique id of event assigned by this SharedBuffer -* @param previousNodeId id of previous entry (might be null if start of new run) -* @param versionVersion of the previous relation -* @return assigned id of this element -* @throws Exception Thrown if the system cannot access the state. -*/ - public NodeId put( - final String stateName, - final EventId eventId, - @Nullable final NodeId previousNodeId, - final DeweyNumber version) throws Exception { + public SharedBufferAccessor getAccessor() { + return new SharedBufferAccessor<>(this); + } - if (previousNodeId != null) { - lockNode(previousNodeId); + public void advanceTime(long timestamp) throws Exception { Review comment: This need to be called in nfa package, I think it should be public. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process
Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209419821 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -161,40 +100,30 @@ public void init( eventsCount.putAll(maxIds); } - /** -* Stores given value (value + timestamp) under the given state. It assigns a preceding element -* relation to the previous entry. -* -* @param stateName name of the state that the event should be assigned to -* @param eventIdunique id of event assigned by this SharedBuffer -* @param previousNodeId id of previous entry (might be null if start of new run) -* @param versionVersion of the previous relation -* @return assigned id of this element -* @throws Exception Thrown if the system cannot access the state. -*/ - public NodeId put( - final String stateName, - final EventId eventId, - @Nullable final NodeId previousNodeId, - final DeweyNumber version) throws Exception { + public SharedBufferAccessor getAccessor() { + return new SharedBufferAccessor<>(this); + } - if (previousNodeId != null) { - lockNode(previousNodeId); + public void advanceTime(long timestamp) throws Exception { Review comment: This need to be called in nfa package, I think it should be public. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] packet23 opened a new pull request #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.
packet23 opened a new pull request #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode. URL: https://github.com/apache/flink/pull/6540 ## What is the purpose of the change Change minimizes probability of a per-job yarn session cluster surviving after client exited. It restores the Flink 1.4 and below cli behaviour. ## Brief change log - when per-job yarn cluster is deployed, a shutdown hook installed - when shutdown hook is called, it terminates the deployed cluster - when cli terminates normally, shutdown hook is called by cli and deinstalled - otherwise, java runtime will call shutdown hook ## Verifying this change We verified the change manually by submitting a DataSet API application to Yarn and killing client before job terminated. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not documented This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on issue #6003: [FLINK-9289][Dataset] Parallelism of generated operators should have max parallelism of input
fhueske commented on issue #6003: [FLINK-9289][Dataset] Parallelism of generated operators should have max parallelism of input URL: https://github.com/apache/flink/pull/6003#issuecomment-410872065 Thanks for the reminder @xccui! I'll try to have a look this week. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StephanEwen opened a new pull request #6507: [FLINK-10069] [docs] Update SSL docs to reflect internal vs. external communication
StephanEwen opened a new pull request #6507: [FLINK-10069] [docs] Update SSL docs to reflect internal vs. external communication URL: https://github.com/apache/flink/pull/6507 This pull requests adds the documentation for the updated SSL setup. It explains internal and external connectivity, and discusses the configuration options. This also simplifies the described example setups a lot, which is now possible because we use dedicated certificates for internal connectivity. Those may be treated as a shared secret and thus need not match exact DNS names / IP addresses. The commits should be applied to the `1.6` and `master` branch. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] suez1224 commented on issue #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17
suez1224 commented on issue #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17 URL: https://github.com/apache/flink/pull/6484#issuecomment-410878018 merged This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 commented on issue #6430: [FLINK-8058][Queryable State]Queryable state should check types
klion26 commented on issue #6430: [FLINK-8058][Queryable State]Queryable state should check types URL: https://github.com/apache/flink/pull/6430#issuecomment-410893513 hi, @kl0u @twalthr Could you please help reviewing this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider idle connection and multithreads synchronization
walterddr commented on a change in pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider idle connection and multithreads synchronization URL: https://github.com/apache/flink/pull/6301#discussion_r207659233 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ## @@ -107,98 +130,11 @@ private void establishConnection() throws SQLException, ClassNotFoundException { */ @Override public void writeRecord(Row row) throws IOException { - if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) { LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array..."); } try { - - if (typesArray == null) { - // no types provided - for (int index = 0; index < row.getArity(); index++) { - LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.", index + 1, row.getField(index)); - upload.setObject(index + 1, row.getField(index)); - } - } else { - // types provided - for (int index = 0; index < row.getArity(); index++) { - - if (row.getField(index) == null) { - upload.setNull(index + 1, typesArray[index]); - } else { - // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html - switch (typesArray[index]) { - case java.sql.Types.NULL: - upload.setNull(index + 1, typesArray[index]); - break; - case java.sql.Types.BOOLEAN: - case java.sql.Types.BIT: - upload.setBoolean(index + 1, (boolean) row.getField(index)); - break; - case java.sql.Types.CHAR: - case java.sql.Types.NCHAR: - case java.sql.Types.VARCHAR: - case java.sql.Types.LONGVARCHAR: - case java.sql.Types.LONGNVARCHAR: - upload.setString(index + 1, (String) row.getField(index)); - break; - case java.sql.Types.TINYINT: - upload.setByte(index + 1, (byte) row.getField(index)); - break; - case java.sql.Types.SMALLINT: - upload.setShort(index + 1, (short) row.getField(index)); - break; - case java.sql.Types.INTEGER: - upload.setInt(index + 1, (int) row.getField(index)); - break; - case java.sql.Types.BIGINT: - upload.setLong(index + 1, (long) row.getField(index)); - break; - case java.sql.Types.REAL: - upload.setFloat(index + 1, (float) row.getField(index)); - break; - case java.sql.Types.FLOAT: - case java.sql.Types.DOUBLE: - upload.setDouble(index + 1, (double) row.getField(index)); - break; -
[GitHub] walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL
walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#discussion_r207694473 ## File path: docs/dev/table/sql.md ## @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string) {% highlight text %} +REGEX_EXTRACT(str string, regex string, extractIndex integer) +{% endhighlight %} + + +Returns the string str extracted using specified regex pattern and index. If str or regex is null, returns null. E.g. REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns bar. Review comment: Also similar to SQL array, might be good to point out that the index starts with 1. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL
walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#discussion_r207694127 ## File path: docs/dev/table/sql.md ## @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string) {% highlight text %} +REGEX_EXTRACT(str string, regex string, extractIndex integer) +{% endhighlight %} + + +Returns the string str extracted using specified regex pattern and index. If str or regex is null, returns null. E.g. REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns bar. Review comment: This might be more informative: ``` Return the string extracted from the `extractedIndex` capturing group using specified `regex` pattern on input string `str`. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on issue #6476: [FLINK-10025] Add getCheckpointConfig API for PythonStreamExecutionEnvironment
zentol commented on issue #6476: [FLINK-10025] Add getCheckpointConfig API for PythonStreamExecutionEnvironment URL: https://github.com/apache/flink/pull/6476#issuecomment-410428229 This still needs a _python_ test to verify that all properties can be properly set from the python side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol edited a comment on issue #6476: [FLINK-10025] Add getCheckpointConfig API for PythonStreamExecutionEnvironment
zentol edited a comment on issue #6476: [FLINK-10025] Add getCheckpointConfig API for PythonStreamExecutionEnvironment URL: https://github.com/apache/flink/pull/6476#issuecomment-410428229 This still needs a _python_ test to verify that all properties can be properly set and retrieved from the python side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6488: [FLINK-10055] incorrect in-progress file suffix in BucketingSink's java doc
yanghua commented on issue #6488: [FLINK-10055] incorrect in-progress file suffix in BucketingSink's java doc URL: https://github.com/apache/flink/pull/6488#issuecomment-410428669 +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6486: [FLINK-10051][tests][sql] Add missing depenendeices for sql client E2E test
yanghua commented on issue #6486: [FLINK-10051][tests][sql] Add missing depenendeices for sql client E2E test URL: https://github.com/apache/flink/pull/6486#issuecomment-410428717 +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] suez1224 commented on issue #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17
suez1224 commented on issue #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17 URL: https://github.com/apache/flink/pull/6484#issuecomment-410430674 @twalthr addressed the comments. Let me know what you think. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#discussion_r207699091 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -450,6 +450,40 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testRegexExtract(): Unit = { Review comment: Good point, here is a problem, I wrote this case to test : ```scala testAllApis( "foothebar".regexExtract("foo([\\w]+)", 1), //OK, "'foothebar'.regexExtract('foo([w]+)', 1)", //failed, the method got 'foo([\\w]+)' returns "", but if pass 'foo([\\w]+)' would get compile error. "REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)",//OK, but must pass four '\' "thebar" ) ``` It seems flink pre-process the regex which contains `\xxx`. A few days ago, we also met this issue when test `similar to` to match the regex which contains `\d`. cc @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#discussion_r207699117 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -450,6 +450,40 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testRegexExtract(): Unit = { Review comment: This test case : ```scala testSqlApi("REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)", "thebar") ``` can pass This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#discussion_r207699091 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -450,6 +450,40 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testRegexExtract(): Unit = { Review comment: Good point, here is a problem, I wrote this case to test : ```scala testAllApis( "foothebar".regexExtract("foo([\\w]+)", 1), //OK, "'foothebar'.regexExtract('foo([w]+)', 1)", //failed, the method got 'foo([\\w]+)' returns "", but if pass 'foo([\\w]+)' would get compile error. "REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)",//OK, but must pass four '\' "thebar" ) ``` It seems flink pre-process the regex which contains `\xxx`. A few days ago, we also met this issue when test `similar to` to match the regex which contains `\d`. cc @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] bowenli86 opened a new pull request #6488: [FLINK-10055] incorrect in-progress file suffix in BucketingSink's java doc
bowenli86 opened a new pull request #6488: [FLINK-10055] incorrect in-progress file suffix in BucketingSink's java doc URL: https://github.com/apache/flink/pull/6488 ## What is the purpose of the change ## Brief change log Similar to the java doc of `setPendingSuffix()` and `setValidLengthSuffix()`, the java doc of `setInProgressSuffix()` should have a `.` in `The default is {@code ".in-progress"}.` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: none ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL
walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#discussion_r207700798 ## File path: docs/dev/table/sql.md ## @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string) {% highlight text %} +REGEX_EXTRACT(str string, regex string, extractIndex integer) +{% endhighlight %} + + +Returns the string str extracted using specified regex pattern and index. If str or regex is null, returns null. E.g. REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns bar. Review comment: Thanks for the explanation. Sorry I was a bit confused on the wording in the doc. Maybe similar to Hive, we can add this line to the doc: ``` See docs/api/java/util/regex/Matcher.html for more information ``` This way it is immediately clear, what do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17
suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17 URL: https://github.com/apache/flink/pull/6484#discussion_r207701607 ## File path: flink-libraries/flink-table/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java ## @@ -0,0 +1,5609 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.sql2rel; + +import org.apache.calcite.avatica.util.Spaces; +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptSamplingParameters; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.prepare.RelOptTableImpl; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Collect; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.core.Sample; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.core.Uncollect; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.logical.LogicalCorrelate; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalIntersect; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.logical.LogicalMatch; +import org.apache.calcite.rel.logical.LogicalMinus; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.rel.logical.LogicalTableFunctionScan; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rel.logical.LogicalUnion; +import org.apache.calcite.rel.logical.LogicalValues; +import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelColumnMapping; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.stream.Delta; +import org.apache.calcite.rel.stream.LogicalDelta; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCallBinding; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexFieldCollation; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexPatternFieldRef; +import org.apache.calcite.rex.RexRangeRef; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexSubQuery; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.rex.RexWindowBound; +import org.apache.calcite.schema.ColumnStrategy; +import org.apache.calcite.schema.ModifiableTable; +import org.apache.calcite.schema.ModifiableView; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.Wrapper; +import org.apache.calcite.sql.JoinConditionType; +import org.apache.calcite.sql.JoinType; +import org.apache.calcite.sql.SemiJoinType; +import
[GitHub] suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17
suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17 URL: https://github.com/apache/flink/pull/6484#discussion_r207701618 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala ## @@ -20,7 +20,10 @@ package org.apache.flink.table.catalog import java.util.{Collection => JCollection, Collections => JCollections, LinkedHashSet => JLinkedHashSet, Set => JSet} +import com.google.common.collect.ImmutableSet +import jdk.nashorn.internal.ir.annotations.Immutable Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17
suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17 URL: https://github.com/apache/flink/pull/6484#discussion_r207701611 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowAggregateReduceFunctionsRule.scala ## @@ -55,19 +58,12 @@ class WindowAggregateReduceFunctionsRule extends AggregateReduceFunctionsRule( newAgg)) } - override def newCalcRel( - relBuilder: RelBuilder, - oldAgg: Aggregate, - exprs: util.List[RexNode]): Unit = { - -// add all named properties of the window to the selection -val oldWindowAgg = oldAgg.asInstanceOf[LogicalWindowAggregate] -oldWindowAgg.getNamedProperties.foreach(np => exprs.add(relBuilder.field(np.name))) - -// create a LogicalCalc that computes the complex aggregates and forwards the window properties -relBuilder.project(exprs, oldAgg.getRowType.getFieldNames) + override def newCalcRel(relBuilder: RelBuilder, rowType: RelDataType, exprs: util.List[RexNode]) { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL
walterddr commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#discussion_r207699504 ## File path: docs/dev/table/sql.md ## @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string) {% highlight text %} +REGEX_EXTRACT(str string, regex string, extractIndex integer) +{% endhighlight %} + + +Returns the string str extracted using specified regex pattern and index. If str or regex is null, returns null. E.g. REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns bar. Review comment: Good point. so another question is: does `REGEX_EXTRACT` returns an array of String similar to how Pattern/Matcher in java does it when extract all capturing groups? or is it concatenated? If so, what's the delimiter? (since in the code it seems only `String` type is returned. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#discussion_r207699629 ## File path: docs/dev/table/sql.md ## @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string) {% highlight text %} +REGEX_EXTRACT(str string, regex string, extractIndex integer) +{% endhighlight %} + + +Returns the string str extracted using specified regex pattern and index. If str or regex is null, returns null. E.g. REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns bar. Review comment: it just returns the extractIndex-th value of match group array, just one element not an array, refers to : https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF (please search 'regex_extract') This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6476: [FLINK-10025] Add getCheckpointConfig API for PythonStreamExecutionEnvironment
yanghua commented on issue #6476: [FLINK-10025] Add getCheckpointConfig API for PythonStreamExecutionEnvironment URL: https://github.com/apache/flink/pull/6476#issuecomment-410428408 OK, Can you review PR #6475 so that I can write a test with Java? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on issue #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17
hequn8128 commented on issue #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17 URL: https://github.com/apache/flink/pull/6484#issuecomment-410431477 @suez1224 Hi, thanks for your PR. Only one question from my side: do we have any good ideas or plans about how to remove `AuxiliaryConverter` since calcite-1761 is fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua edited a comment on issue #6487: [FLINK-10053] Add getParallelism API for PythonStreamExecutionEnvironment
yanghua edited a comment on issue #6487: [FLINK-10053] Add getParallelism API for PythonStreamExecutionEnvironment URL: https://github.com/apache/flink/pull/6487#issuecomment-410422509 @zentol the test will be added after the PR : #6475 been merged This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#discussion_r207699091 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -450,6 +450,40 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testRegexExtract(): Unit = { Review comment: Good point, here is a problem, I wrote this case to test : ```scala testAllApis( "foothebar".regexExtract("foo([\\w]+)", 1), //OK, "'foothebar'.regexExtract('foo([w]+)', 1)", //failed, got 'foo([\\w]+)' returns "", but if pass 'foo([\\w]+)' would get compile error. "REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)",//OK, but must pass four '\' "thebar" ) ``` It seems flink pre-process the regex which contains `\xxx`. A few days ago, we also met this issue when test `similar to` to match the regex which contains `\d`. cc @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua opened a new pull request #6487: [FLINK-10053] Add getParallelism API for PythonStreamExecutionEnvironment
yanghua opened a new pull request #6487: [FLINK-10053] Add getParallelism API for PythonStreamExecutionEnvironment URL: https://github.com/apache/flink/pull/6487 ## What is the purpose of the change *This pull request adds getParallelism API for PythonStreamExecutionEnvironment* ## Brief change log - *Add getParallelism API for PythonStreamExecutionEnvironment* ## Verifying this change TBD ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6487: [FLINK-10053] Add getParallelism API for PythonStreamExecutionEnvironment
yanghua commented on issue #6487: [FLINK-10053] Add getParallelism API for PythonStreamExecutionEnvironment URL: https://github.com/apache/flink/pull/6487#issuecomment-410422509 @zentol the test will be added after the PR : #6475 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Xpray commented on issue #6465: [FLINK-10008] [table] Improve the LOG function in Table to support bases less than 1
Xpray commented on issue #6465: [FLINK-10008] [table] Improve the LOG function in Table to support bases less than 1 URL: https://github.com/apache/flink/pull/6465#issuecomment-410423217 @xccui, I've updated the PR. Best. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#discussion_r207701539 ## File path: docs/dev/table/sql.md ## @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string) {% highlight text %} +REGEX_EXTRACT(str string, regex string, extractIndex integer) +{% endhighlight %} + + +Returns the string str extracted using specified regex pattern and index. If str or regex is null, returns null. E.g. REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns bar. Review comment: I agree this idea, but the path `docs/api/java/util/regex/Matcher.html`, how to access? It's not a complete URL. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17
suez1224 commented on a change in pull request #6484: [FLINK-9134] [table] upgrade Calcite dependency to 1.17 URL: https://github.com/apache/flink/pull/6484#discussion_r207702177 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala ## @@ -161,7 +161,7 @@ class GroupWindowTest extends TableTestBase { "rowtime('w$) AS w$rowtime", "proctime('w$) AS w$proctime") ), -term("select", "EXPR$0", "DATETIME_PLUS(w$end, 6) AS EXPR$1") +term("select", "EXPR$0", "+(w$end, 6) AS EXPR$1") Review comment: This is changed in https://issues.apache.org/jira/browse/CALCITE-2188. DATETIME_PLUS operator is not equal to PLUS operator, they just somehow share the same name "+" in plan after the PR. I can reach out to Calcite community to see if it makes sense to revert it back, I agree that using "DATETIME_PLUS" is more clear. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#discussion_r207698953 ## File path: docs/dev/table/sql.md ## @@ -1842,6 +1842,16 @@ RPAD(text string, len integer, pad string) {% highlight text %} +REGEX_EXTRACT(str string, regex string, extractIndex integer) +{% endhighlight %} + + +Returns the string str extracted using specified regex pattern and index. If str or regex is null, returns null. E.g. REGEX_EXTRACT('foothebar', 'foo(.*?)(bar)', 2) returns bar. Review comment: for`REGEX_EXTRACT `, it can pass 0, that means extract all. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL
yanghua commented on a change in pull request #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#discussion_r207699091 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ## @@ -450,6 +450,40 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "") } + @Test + def testRegexExtract(): Unit = { Review comment: Good point, here is a problem, I wrote this case to test : ```scala testAllApis( "foothebar".regexExtract("foo([\\w]+)", 1), //OK, the method got 'foo([\w]+)' "'foothebar'.regexExtract('foo([w]+)', 1)", //failed, the method got 'foo([\\w]+)' returns "", but if pass 'foo([\\w]+)' would get compile error. "REGEX_EXTRACT('foothebar', 'foo([w]+)', 1)",//OK, the method got 'foo([\w]+)' but must pass four '\' "thebar" ) ``` It seems flink pre-process the regex which contains `\xxx`. A few days ago, we also met this issue when test `similar to` to match the regex which contains `\d`. cc @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Xpray commented on a change in pull request #6465: [FLINK-10008] [table] Improve the LOG function in Table to support bases less than 1
Xpray commented on a change in pull request #6465: [FLINK-10008] [table] Improve the LOG function in Table to support bases less than 1 URL: https://github.com/apache/flink/pull/6465#discussion_r207699770 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala ## @@ -103,11 +103,13 @@ object ScalarFunctions { if (x <= 0.0) { throw new IllegalArgumentException(s"x of 'log(base, x)' must be > 0, but x = $x") } -if (base <= 1.0) { - throw new IllegalArgumentException(s"base of 'log(base, x)' must be > 1, but base = $base") -} else { - Math.log(x) / Math.log(base) +if (base.compareTo(1.0) == 0) { + throw new IllegalArgumentException(s"base of 'log(base, x)' can not be 1") +} +if (base.compareTo(0.0) <= 0) { Review comment: hi @xccui , you're right, there's no need to call ``compareTo`` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on issue #6078: [FLINK-9438] Add documentation for AvroDeserializationSchema
tzulitai commented on issue #6078: [FLINK-9438] Add documentation for AvroDeserializationSchema URL: https://github.com/apache/flink/pull/6078#issuecomment-411288894 +1, LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #6506: [FLINK-10073] [sql-client] Allow setting a restart strategy in SQL Client
asfgit closed pull request #6506: [FLINK-10073] [sql-client] Allow setting a restart strategy in SQL Client URL: https://github.com/apache/flink/pull/6506 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md index d35aa591a4d..0e8d2d651b1 100644 --- a/docs/dev/table/sqlClient.md +++ b/docs/dev/table/sqlClient.md @@ -108,6 +108,8 @@ Greg, 1 Both result modes can be useful during the prototyping of SQL queries. +Attention Queries that are executed in a batch environment, can only be retrieved using the `table` result mode. + After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. For this, a target system that stores the results needs to be specified using the [INSERT INTO statement](sqlClient.html#detached-sql-queries). The [configuration section](sqlClient.html#configuration) explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties. {% top %} @@ -204,6 +206,8 @@ execution: max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default) min-idle-state-retention: 0 # optional: table program's minimum idle state time max-idle-state-retention: 0 # optional: table program's maximum idle state time + restart-strategy: # optional: restart strategy +type: fallback # "fallback" to global restart strategy by default # Deployment properties allow for describing the cluster to which table programs are submitted to. @@ -227,7 +231,35 @@ Depending on the use case, a configuration can be split into multiple files. The CLI commands > session environment file > defaults environment file {% endhighlight %} -Queries that are executed in a batch environment, can only be retrieved using the `table` result mode. + Restart Strategies + +Restart strategies control how Flink jobs are restarted in case of a failure. Similar to [global restart strategies]({{ site.baseurl }}/dev/restart_strategies.html) for a Flink cluster, a more fine-grained restart configuration can be declared in an environment file. + +The following strategies are supported: + +{% highlight yaml %} +execution: + # falls back to the global strategy defined in flink-conf.yaml + restart-strategy: +type: fallback + + # job fails directly and no restart is attempted + restart-strategy: +type: none + + # attempts a given number of times to restart the job + restart-strategy: +type: fixed-delay +attempts: 3 # retries before job is declared as failed (default: Integer.MAX_VALUE) +delay: 1 # delay in ms between retries (default: 10 s) + + # attempts as long as the maximum number of failures per time interval is not exceeded + restart-strategy: +type: failure-rate +max-failures-per-interval: 1 # retries in interval until failing (default: 1) +failure-rate-interval: 6 # measuring interval in ms for failure rate +delay: 1 # delay in ms between retries (default: 10 s) +{% endhighlight %} {% top %} diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml index 51e6e95bc1f..8be4ce63ecb 100644 --- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml +++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml @@ -74,6 +74,12 @@ execution: min-idle-state-retention: 0 # maximum idle state retention in ms max-idle-state-retention: 0 + # controls how table programs are restarted in case of a failures + restart-strategy: +# strategy type +# possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default) +type: fallback + #== # Deployment properties diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java index 0d6e6dd59ac..b7c28938121 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java @@ -18,6 +18,8 @@ package org.apache.flink.table.client.config; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.TimeCharacteristic; import java.util.Collections; @@ -57,10 +59,10 @@
[GitHub] GJL commented on a change in pull request #6496: [FLINK-10063][tests] Use runit to supervise mesos processes.
GJL commented on a change in pull request #6496: [FLINK-10063][tests] Use runit to supervise mesos processes. URL: https://github.com/apache/flink/pull/6496#discussion_r208471471 ## File path: flink-jepsen/src/jepsen/flink/mesos.clj ## @@ -24,11 +24,35 @@ [jepsen.os.debian :as debian] [jepsen.flink.zookeeper :refer [zookeeper-uri]])) +;;; runit process supervisor (http://smarden.org/runit/) +;;; +;;; We use runit to supervise Mesos processes because Mesos uses a "fail-fast" approach to +;;; error handling, e.g., the Mesos master will exit when it discovers it has been partitioned away +;;; from the Zookeeper quorum. + +(def runit-version "2.1.2-3") + +(defn create-supervised-service! + "Registers a service with the process supervisor and starts it." + [service-name cmd] + (let [service-dir (str "/etc/sv/" service-name) +run-script (str service-dir "/run")] +(c/su + (c/exec :mkdir :-p service-dir) + (c/exec :echo (clojure.string/join "\n" ["#!/bin/sh" cmd]) :> run-script) Review comment: I'll fix this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services