flink git commit: [hotfix][travis] deploy snapshots only for master branch
Repository: flink Updated Branches: refs/heads/master edae79340 -> 1dee62b4b [hotfix][travis] deploy snapshots only for master branch Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dee62b4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dee62b4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dee62b4 Branch: refs/heads/master Commit: 1dee62b4b691fe29f6975844366b7328cf6c41d3 Parents: edae793 Author: Maximilian Michels Authored: Fri Feb 12 11:28:52 2016 +0100 Committer: Maximilian Michels Committed: Fri Feb 12 11:30:36 2016 +0100 -- tools/deploy_to_maven.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/1dee62b4/tools/deploy_to_maven.sh -- diff --git a/tools/deploy_to_maven.sh b/tools/deploy_to_maven.sh index 8ace5aa..b2c6cce 100755 --- a/tools/deploy_to_maven.sh +++ b/tools/deploy_to_maven.sh @@ -72,7 +72,7 @@ pwd # Check if push/commit is eligible for deploying echo "Job: $TRAVIS_JOB_NUMBER ; isPR: $TRAVIS_PULL_REQUEST ; repo slug : $TRAVIS_REPO_SLUG " -if [[ $TRAVIS_PULL_REQUEST == "false" ]] && [[ $TRAVIS_REPO_SLUG == "apache/flink" ]] ; then +if [[ $TRAVIS_PULL_REQUEST == "false" ]] && [[ $TRAVIS_REPO_SLUG == "apache/flink" ]] && [[ $TRAVIS_BRANCH == "master" ]] ; then echo "install lifecylce mapping fake plugin" git clone https://github.com/mfriedenhagen/dummy-lifecycle-mapping-plugin.git
[23/50] [abbrv] flink git commit: [FLINK-3260] [runtime] Enforce terminal state of Executions
[FLINK-3260] [runtime] Enforce terminal state of Executions This commit fixes the problem that Executions could leave their terminal state FINISHED to transition to FAILED. Such a transition will be propagated to the ExecutionGraph where it entails JobStatus changes. Since the Execution already reached a terminal state, it should not again affect the ExecutionGraph. This can lead to an inconsistent state in case of a restart where the old Executions get disassociated from the ExecutionGraph. This closes #1613 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6968a57a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6968a57a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6968a57a Branch: refs/heads/tableOnCalcite Commit: 6968a57a1a31a11b33bacd2c94d6559bcabd6eb9 Parents: 48b7454 Author: Till Rohrmann Authored: Tue Feb 9 10:30:12 2016 +0100 Committer: Stephan Ewen Committed: Wed Feb 10 15:34:37 2016 +0100 -- .../flink/runtime/executiongraph/Execution.java | 14 +- .../ExecutionGraphRestartTest.java | 90 + .../runtime/testingUtils/TestingCluster.scala | 6 +- .../testingUtils/TestingTaskManagerLike.scala | 4 +- .../runtime/testingUtils/TestingUtils.scala | 133 ++- 5 files changed, 233 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index eb2e68c..db037bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -435,7 +435,7 @@ public class Execution implements Serializable { return; } else if (current == CREATED || current == SCHEDULED) { - // from here, we can directly switch to cancelled, because the no task has been deployed + // from here, we can directly switch to cancelled, because no task has been deployed if (transitionState(current, CANCELED)) { // we skip the canceling state. set the timestamp, for a consistent appearance @@ -754,11 +754,10 @@ public class Execution implements Serializable { return false; } - if (current == CANCELED) { - // we are already aborting or are already aborted + if (current == CANCELED || current == FINISHED) { + // we are already aborting or are already aborted or we are already finished if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Ignoring transition of vertex %s to %s while being %s", - getVertexWithAttempt(), FAILED, CANCELED)); + LOG.debug("Ignoring transition of vertex {} to {} while being {}.", getVertexWithAttempt(), FAILED, current); } return false; } @@ -928,6 +927,11 @@ public class Execution implements Serializable { } private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) { + // sanity check + if (currentState.isTerminal()) { + throw new IllegalStateException("Cannot leave terminal state " + currentState + " to transition to " + targetState + "."); + } + if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) { markTimestamp(targetState); http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 0c3af8f..47a48a2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b
[37/50] [abbrv] flink git commit: Renamed Table.scala to table.scala
Renamed Table.scala to table.scala Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4abca1d0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4abca1d0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4abca1d0 Branch: refs/heads/tableOnCalcite Commit: 4abca1d0e3bba30f6adee39f9b278020fdf0f4fc Parents: ed6cc91 Author: Fabian Hueske Authored: Mon Jan 25 15:34:24 2016 +0100 Committer: Fabian Hueske Committed: Fri Feb 12 11:34:08 2016 +0100 -- .../org/apache/flink/api/table/Table.scala | 392 --- .../org/apache/flink/api/table/table.scala | 392 +++ 2 files changed, 392 insertions(+), 392 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4abca1d0/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala deleted file mode 100644 index 271aa99..000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala +++ /dev/null @@ -1,392 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataTypeField -import org.apache.calcite.rel.core.JoinRelType -import org.apache.calcite.rex.RexNode -import org.apache.calcite.tools.RelBuilder -import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey} -import org.apache.flink.api.table.plan.RexNodeTranslator -import RexNodeTranslator.{toRexNode, extractAggCalls} -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.parser.ExpressionParser - -import scala.collection.JavaConverters._ - -case class BaseTable( -private[flink] val relNode: RelNode, -private[flink] val relBuilder: RelBuilder) - -/** - * The abstraction for writing Table API programs. Similar to how the batch and streaming APIs - * have [[org.apache.flink.api.scala.DataSet]] and - * [[org.apache.flink.streaming.api.scala.DataStream]]. - * - * Use the methods of [[Table]] to transform data. Use - * [[org.apache.flink.api.java.table.TableEnvironment]] to convert a [[Table]] back to a DataSet - * or DataStream. - * - * When using Scala a [[Table]] can also be converted using implicit conversions. - * - * Example: - * - * {{{ - * val table = set.toTable('a, 'b) - * ... - * val table2 = ... - * val set = table2.toDataSet[MyType] - * }}} - * - * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either take arguments - * in a Scala DSL or as an expression String. Please refer to the documentation for the expression - * syntax. - */ -class Table( - private[flink] override val relNode: RelNode, - private[flink] override val relBuilder: RelBuilder) - extends BaseTable(relNode, relBuilder) -{ - - /** - * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions - * can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10)) - * }}} - */ - def select(fields: Expression*): Table = { - -relBuilder.push(relNode) - -// separate aggregations and selection expressions -val extractedAggCalls: List[(Expression, List[AggCall])] = fields - .map(extractAggCalls(_, relBuilder)).toList - -// get aggregation calls -val aggCalls: List[AggCall] = extractedAggCalls - .map(_._2).reduce( (x,y) => x ::: y) - -// apply aggregations -if (aggCalls.nonEmpty) { - val emptyKey: GroupKey = relBuilder.groupKey() - relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava) -} - -// get selection expressions -val exprs: List[RexNode] = extractedAggCalls - .map(_._1) - .map(toRexNode(_, relB
[36/50] [abbrv] flink git commit: [FLINK-3223] Translate Table API calls to Calcite RelNodes.
[FLINK-3223] Translate Table API calls to Calcite RelNodes. This is an intermediate step to port the Table API on top of Calcite (FLINK-3221). This commit: - Adds Calcite as dependency to flink-table. - Translates Table API calls directly into Calcite RelNodes. - Modifies tests to check only the translation into logical plans but not the execution of Table API queries. - Deactivates a few tests that are not supported yet. - Removes a lot of the former Table API translation code. - Removes bitwise operators from the Table API. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed6cc91e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed6cc91e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed6cc91e Branch: refs/heads/tableOnCalcite Commit: ed6cc91e227bc6b053ca9e0142a680f6301bdaf6 Parents: 1dee62b Author: Fabian Hueske Authored: Fri Jan 15 17:46:39 2016 +0100 Committer: Fabian Hueske Committed: Fri Feb 12 11:34:08 2016 +0100 -- flink-libraries/flink-table/pom.xml | 6 + .../api/java/table/JavaBatchTranslator.scala| 340 ++- .../java/table/JavaStreamingTranslator.scala| 241 - .../flink/api/java/table/TableEnvironment.scala | 43 +-- .../api/scala/table/DataStreamConversions.scala | 68 .../api/scala/table/ScalaBatchTranslator.scala | 26 +- .../scala/table/ScalaStreamingTranslator.scala | 58 .../api/scala/table/TableConversions.scala | 12 +- .../flink/api/scala/table/expressionDsl.scala | 5 - .../apache/flink/api/scala/table/package.scala | 21 +- .../org/apache/flink/api/table/Table.scala | 236 + .../table/codegen/ExpressionCodeGenerator.scala | 35 -- .../api/table/expressions/aggregations.scala| 40 +-- .../analysis/ExtractEquiJoinFields.scala| 70 .../expressions/analysis/GroupByAnalyzer.scala | 51 --- .../expressions/analysis/InsertAutoCasts.scala | 92 - .../analysis/PredicateAnalyzer.scala| 35 -- .../analysis/ResolveFieldReferences.scala | 60 .../analysis/SelectionAnalyzer.scala| 36 -- .../table/expressions/analysis/TypeCheck.scala | 57 .../expressions/analysis/VerifyBoolean.scala| 41 --- .../analysis/VerifyNoAggregates.scala | 53 --- .../analysis/VerifyNoNestedAggregates.scala | 54 --- .../api/table/expressions/arithmetic.scala | 56 +-- .../flink/api/table/expressions/cast.scala | 5 + .../api/table/expressions/fieldExpression.scala | 5 + .../api/table/parser/ExpressionParser.scala | 16 +- .../api/table/plan/ExpandAggregations.scala | 147 .../flink/api/table/plan/PlanTranslator.scala | 133 ++-- .../api/table/plan/RexNodeTranslator.scala | 184 ++ .../flink/api/table/plan/TypeConverter.scala| 54 +++ .../flink/api/table/plan/operations.scala | 134 .../api/table/plan/operators/DataSetTable.scala | 66 .../apache/flink/api/table/plan/package.scala | 24 -- .../apache/flink/api/table/trees/Analyzer.scala | 43 --- .../org/apache/flink/api/table/trees/Rule.scala | 30 -- .../examples/scala/StreamingTableFilter.scala | 90 - .../api/java/table/test/AggregationsITCase.java | 67 ++-- .../flink/api/java/table/test/AsITCase.java | 68 ++-- .../api/java/table/test/CastingITCase.java | 62 ++-- .../api/java/table/test/ExpressionsITCase.java | 83 + .../flink/api/java/table/test/FilterITCase.java | 58 ++-- .../table/test/GroupedAggregationsITCase.java | 35 +- .../flink/api/java/table/test/JoinITCase.java | 70 ++-- .../api/java/table/test/PojoGroupingITCase.java | 18 +- .../flink/api/java/table/test/SelectITCase.java | 70 ++-- .../api/java/table/test/SqlExplainITCase.java | 7 + .../table/test/StringExpressionsITCase.java | 45 +-- .../flink/api/java/table/test/UnionITCase.java | 44 +-- .../scala/table/test/PageRankTableITCase.java | 7 +- .../scala/table/test/TypeExceptionTest.scala| 42 --- .../scala/table/test/AggregationsITCase.scala | 104 +++--- .../flink/api/scala/table/test/AsITCase.scala | 74 ++-- .../api/scala/table/test/CastingITCase.scala| 57 ++-- .../scala/table/test/ExpressionsITCase.scala| 90 ++--- .../api/scala/table/test/FilterITCase.scala | 59 ++-- .../table/test/GroupedAggregationsITCase.scala | 63 ++-- .../flink/api/scala/table/test/JoinITCase.scala | 73 ++-- .../api/scala/table/test/SelectITCase.scala | 99 +++--- .../api/scala/table/test/SqlExplainITCase.scala | 198 +-- .../table/test/StringExpressionsITCase.scala| 52 +-- .../api/scala/table/test/UnionITCase.scala | 34 +- 62 files changed, 1314 insertions(+), 2932 deletions(-) -- http://git-wip-us.apache.org/repos/as
[50/50] [abbrv] flink git commit: [FLINK-3226] implement getUniqueName method in TranslationContext
[FLINK-3226] implement getUniqueName method in TranslationContext This closes #1600 and #1567 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7428263 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7428263 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7428263 Branch: refs/heads/tableOnCalcite Commit: e742826326c2719f2f5e06a5e4020f743c3278f9 Parents: 18e7f2f Author: vasia Authored: Thu Feb 11 14:24:24 2016 +0100 Committer: Fabian Hueske Committed: Fri Feb 12 11:34:10 2016 +0100 -- .../flink/api/table/plan/RexNodeTranslator.scala | 2 +- .../flink/api/table/plan/TranslationContext.scala | 4 .../plan/nodes/dataset/DataSetGroupReduce.scala | 2 +- .../api/java/table/test/AggregationsITCase.java | 4 +--- .../api/scala/table/test/AggregationsITCase.scala | 18 +++--- .../api/scala/table/test/ExpressionsITCase.scala | 1 - 6 files changed, 22 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala index 07e3924..bad111f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala @@ -35,7 +35,7 @@ object RexNodeTranslator { exp match { case agg: Aggregation => -val name = "TMP_" + agg.hashCode().toHexString.toUpperCase +val name = TranslationContext.getUniqueName val aggCall = toAggCall(agg, name, relBuilder) val fieldExp = new UnresolvedFieldReference(name) (fieldExp, List(aggCall)) http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala index b2b0c2b..51af8d6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala @@ -70,6 +70,10 @@ object TranslationContext { } + def getUniqueName: String = { +"TMP_" + nameCntr.getAndIncrement() + } + def getRelBuilder: RelBuilder = { relBuilder } http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala index ad7e0e9..afe09bb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala @@ -67,7 +67,7 @@ class DataSetGroupReduce( config: TableConfig, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { -val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config) +val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType) // get the output types val fieldsNames = rowType.getFieldNames http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java -- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java index 8e81893..30598c4 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java @@ -62,7 +62,6 @@ public class AggregationsITCase exten
[39/50] [abbrv] flink git commit: [Flink-3226] Translate logical plan FlinkRels into physical plan DataSetRels.
[Flink-3226] Translate logical plan FlinkRels into physical plan DataSetRels. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6dd2d779 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6dd2d779 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6dd2d779 Branch: refs/heads/tableOnCalcite Commit: 6dd2d779d18113034311f13512deb5a7afbf885e Parents: a4ad9dd Author: chengxiang li Authored: Mon Feb 1 15:18:14 2016 +0800 Committer: Fabian Hueske Committed: Fri Feb 12 11:34:09 2016 +0100 -- .../flink/api/table/plan/PlanGenException.scala | 26 .../flink/api/table/plan/TypeConverter.scala| 13 +- .../plan/functions/AggregateFunction.scala | 71 + .../table/plan/functions/FunctionUtils.scala| 37 + .../plan/functions/aggregate/Aggregate.scala| 42 ++ .../functions/aggregate/AggregateFactory.scala | 135 + .../plan/functions/aggregate/AvgAggregate.scala | 148 +++ .../functions/aggregate/CountAggregate.scala| 34 + .../plan/functions/aggregate/MaxAggregate.scala | 136 + .../plan/functions/aggregate/MinAggregate.scala | 136 + .../plan/functions/aggregate/SumAggregate.scala | 130 .../plan/nodes/dataset/DataSetGroupReduce.scala | 6 +- .../table/plan/nodes/dataset/DataSetJoin.scala | 6 +- .../plan/nodes/dataset/DataSetReduce.scala | 6 +- .../rules/dataset/DataSetAggregateRule.scala| 17 ++- .../plan/rules/dataset/DataSetJoinRule.scala| 102 - 16 files changed, 1025 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6dd2d779/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala new file mode 100644 index 000..2fd400d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan + +class PlanGenException(message: String, exception: Exception) extends +RuntimeException(message: String, exception: Exception){ + + def this(message: String){ +this(message, null) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6dd2d779/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala index 227b3e8..b7cb200 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.table.plan import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.rel.core.JoinRelType._ import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} @@ -29,8 +29,11 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.api.java.typeutils.ValueTypeInfo._ import org.apache.flink.api.table.typeinfo.RowTypeInfo import org.apache.flink.api.table.{Row, TableException} - import scala.collection.JavaConversions._ +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.calcite
[01/50] [abbrv] flink git commit: [FLINK-3334] [conf] Include year-month-day in the log output [Forced Update!]
Repository: flink Updated Branches: refs/heads/tableOnCalcite fff25df5e -> e74282632 (forced update) [FLINK-3334] [conf] Include year-month-day in the log output Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d51bec15 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d51bec15 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d51bec15 Branch: refs/heads/tableOnCalcite Commit: d51bec1524a9832a7e6c0f6c5d5be5a42712d365 Parents: 457cb14 Author: Stephan Ewen Authored: Mon Feb 8 17:38:52 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 8 19:59:31 2016 +0100 -- flink-dist/src/main/flink-bin/conf/log4j-cli.properties| 4 ++-- .../src/main/flink-bin/conf/log4j-yarn-session.properties | 2 +- flink-dist/src/main/flink-bin/conf/log4j.properties| 2 +- flink-dist/src/main/flink-bin/conf/logback-yarn.xml| 6 +++--- flink-dist/src/main/flink-bin/conf/logback.xml | 5 +++-- 5 files changed, 10 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d51bec15/flink-dist/src/main/flink-bin/conf/log4j-cli.properties -- diff --git a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties index 9c56e61..acb9d1a 100644 --- a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties +++ b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties @@ -23,7 +23,7 @@ log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n +log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log output from org.apache.flink.yarn to the console. This is used by the @@ -34,7 +34,7 @@ log4j.logger.org.apache.hadoop=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n +log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # suppress the warning that hadoop native libraries are not loaded (irrelevant for the client) log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF http://git-wip-us.apache.org/repos/asf/flink/blob/d51bec15/flink-dist/src/main/flink-bin/conf/log4j-yarn-session.properties -- diff --git a/flink-dist/src/main/flink-bin/conf/log4j-yarn-session.properties b/flink-dist/src/main/flink-bin/conf/log4j-yarn-session.properties index 1f49676..07f65a7 100644 --- a/flink-dist/src/main/flink-bin/conf/log4j-yarn-session.properties +++ b/flink-dist/src/main/flink-bin/conf/log4j-yarn-session.properties @@ -21,7 +21,7 @@ log4j.rootLogger=INFO, stdout # Log all infos in the given file log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n +log4j.appender.stdout.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # suppress the irrelevant (wrong) warnings from the netty channel handler log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, stdout http://git-wip-us.apache.org/repos/asf/flink/blob/d51bec15/flink-dist/src/main/flink-bin/conf/log4j.properties -- diff --git a/flink-dist/src/main/flink-bin/conf/log4j.properties b/flink-dist/src/main/flink-bin/conf/log4j.properties index adcff38..97ec653 100644 --- a/flink-dist/src/main/flink-bin/conf/log4j.properties +++ b/flink-dist/src/main/flink-bin/conf/log4j.properties @@ -23,7 +23,7 @@ log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n +log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # suppress the irrelevant (wrong) warnings from the netty channel handler log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file http://git-wip-us.apache.org/repos/asf/flink/blob/d51bec15/flink-dist/src/main/flink-bin/conf/logback-yarn.xml -- diff --git a/flink-dist/src/main/flink-bin/conf/logback-yarn.xml b/flink-dist/src/main/flink-bin/conf/logback-yar
[46/50] [abbrv] flink git commit: [FLINK-3226] implement GroupReduce translation; enable tests for supported operations
http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala index f300547..c56ab92 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala @@ -77,7 +77,6 @@ class ExpressionsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - // advanced functions not supported yet @Ignore @Test def testCaseInsensitiveForAs(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala index 50ce150..82c4dc2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala @@ -46,7 +46,7 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test def testGroupedAggregate(): Unit = { // the grouping key needs to be forwarded to the intermediate DataSet, even @@ -62,7 +62,7 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test def testGroupingKeyForwardIfNotUsed(): Unit = { // the grouping key needs to be forwarded to the intermediate DataSet, even @@ -78,7 +78,7 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test def testGroupNoAggregation(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment
[33/50] [abbrv] flink git commit: [hotfix][travis] deploy snapshots only for master branch
[hotfix][travis] deploy snapshots only for master branch Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dee62b4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dee62b4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dee62b4 Branch: refs/heads/tableOnCalcite Commit: 1dee62b4b691fe29f6975844366b7328cf6c41d3 Parents: edae793 Author: Maximilian Michels Authored: Fri Feb 12 11:28:52 2016 +0100 Committer: Maximilian Michels Committed: Fri Feb 12 11:30:36 2016 +0100 -- tools/deploy_to_maven.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/1dee62b4/tools/deploy_to_maven.sh -- diff --git a/tools/deploy_to_maven.sh b/tools/deploy_to_maven.sh index 8ace5aa..b2c6cce 100755 --- a/tools/deploy_to_maven.sh +++ b/tools/deploy_to_maven.sh @@ -72,7 +72,7 @@ pwd # Check if push/commit is eligible for deploying echo "Job: $TRAVIS_JOB_NUMBER ; isPR: $TRAVIS_PULL_REQUEST ; repo slug : $TRAVIS_REPO_SLUG " -if [[ $TRAVIS_PULL_REQUEST == "false" ]] && [[ $TRAVIS_REPO_SLUG == "apache/flink" ]] ; then +if [[ $TRAVIS_PULL_REQUEST == "false" ]] && [[ $TRAVIS_REPO_SLUG == "apache/flink" ]] && [[ $TRAVIS_BRANCH == "master" ]] ; then echo "install lifecylce mapping fake plugin" git clone https://github.com/mfriedenhagen/dummy-lifecycle-mapping-plugin.git
[13/50] [abbrv] flink git commit: [FLINK-3366] Rename @Experimental annotation to @PublicEvolving
http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java index 5dd2988..223ebee 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java; -import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; @@ -183,7 +183,7 @@ public class RemoteEnvironment extends ExecutionEnvironment { } @Override - @Experimental + @PublicEvolving public void startNewSession() throws Exception { dispose(); jobID = JobID.generate(); http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java index 2eda077..fdd114e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java @@ -17,6 +17,7 @@ */ package org.apache.flink.api.java.functions; + import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.util.Collector; http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java index dd00c31..0ce518e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java @@ -26,7 +26,7 @@ import java.lang.annotation.Retention; import java.util.HashSet; import java.util.Set; -import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.InvalidProgramException; @@ -310,7 +310,7 @@ public class FunctionAnnotation { */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) - @Experimental + @PublicEvolving public @interface ReadFields { String[] value(); } @@ -341,7 +341,7 @@ public class FunctionAnnotation { */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) - @Experimental + @PublicEvolving public @interface ReadFieldsFirst { String[] value(); } @@ -372,7 +372,7 @@ public class FunctionAnnotation { */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) - @Experimental + @PublicEvolving public @interface ReadFieldsSecond { String[] value(); } @@ -389,7 +389,7 @@ public class FunctionAnnotation { */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) - @Experimental + @PublicEvolving public @interface SkipCodeAnalysis { } http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java index 9c6621d..3d656a4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import org.apache.flink.annotation.Public; -import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.operators.DataSource; @@ -110,7 +110,7 @@ public class
[18/50] [abbrv] flink git commit: [FLINK-3341] Make 'auto.offset.reset' compatible with Kafka 0.8 and 0.9
[FLINK-3341] Make 'auto.offset.reset' compatible with Kafka 0.8 and 0.9 This closes #1597 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9173825a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9173825a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9173825a Branch: refs/heads/tableOnCalcite Commit: 9173825aa6a1525d72a78cda16cb4ae1e9b8a8e4 Parents: 8ccd754 Author: Robert Metzger Authored: Sat Feb 6 13:27:06 2016 +0100 Committer: Stephan Ewen Committed: Wed Feb 10 15:12:34 2016 +0100 -- .../connectors/kafka/FlinkKafkaConsumer08.java | 2 +- .../kafka/internals/LegacyFetcher.java | 3 ++- .../connectors/kafka/Kafka08ITCase.java | 22 ++-- .../kafka/KafkaTestEnvironmentImpl.java | 20 +++--- .../kafka/KafkaTestEnvironmentImpl.java | 22 +--- .../connectors/kafka/KafkaConsumerTestBase.java | 5 ++--- .../connectors/kafka/KafkaTestBase.java | 2 -- .../connectors/kafka/KafkaTestEnvironment.java | 3 --- .../flink/yarn/YARNSessionFIFOITCase.java | 1 + 9 files changed, 25 insertions(+), 55 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index bdea37f..1cdfffe 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -70,7 +70,7 @@ import static com.google.common.base.Preconditions.checkNotNull; * socket.timeout.ms * socket.receive.buffer.bytes * fetch.message.max.bytes - * auto.offset.reset with the values "latest", "earliest" (unlike 0.8.2 behavior) + * auto.offset.reset with the values "largest", "smallest" * fetch.wait.max.ms * * http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java index fe7f777..10f4c41 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java @@ -576,7 +576,8 @@ public class LegacyFetcher implements Fetcher { private static long getInvalidOffsetBehavior(Properties config) { long timeType; - if (config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").equals("latest")) { + String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest"); + if (val.equals("largest") || val.equals("latest")) { // largest is kafka 0.8, latest is kafka 0.9 timeType = OffsetRequest.LatestTime(); } else { timeType = OffsetRequest.EarliestTime(); http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index 6a2fa27..a3e815e 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-streaming-con
[16/50] [abbrv] flink git commit: [docs] Fixed typo in Storm Compatibility page
[docs] Fixed typo in Storm Compatibility page This closes #1618 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aeee6efd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aeee6efd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aeee6efd Branch: refs/heads/tableOnCalcite Commit: aeee6efd451dcbdda37ed05779bda74291079ed5 Parents: 0a63797 Author: Georgios Andrianakis Authored: Wed Feb 10 14:30:46 2016 +0200 Committer: mjsax Committed: Wed Feb 10 14:51:14 2016 +0100 -- docs/apis/streaming/storm_compatibility.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/aeee6efd/docs/apis/streaming/storm_compatibility.md -- diff --git a/docs/apis/streaming/storm_compatibility.md b/docs/apis/streaming/storm_compatibility.md index d646040..4d5715c 100644 --- a/docs/apis/streaming/storm_compatibility.md +++ b/docs/apis/streaming/storm_compatibility.md @@ -228,8 +228,8 @@ DataStream> multiStream = ... SplitStream> splitStream = multiStream.split(new StormStreamSelector()); // remove SplitStreamType using SplitStreamMapper to get data stream of type SomeType -DataStream s1 = splitStream.select("s1").map(new SplitStreamMapper()).returns(SomeType.classs); -DataStream s2 = splitStream.select("s2").map(new SplitStreamMapper()).returns(SomeType.classs); +DataStream s1 = splitStream.select("s1").map(new SplitStreamMapper()).returns(SomeType.class); +DataStream s2 = splitStream.select("s2").map(new SplitStreamMapper()).returns(SomeType.class); // do further processing on s1 and s2 [...]
[31/50] [abbrv] flink git commit: [FLINK-3389] [rocksdb] Add pre-defined option profiles.
[FLINK-3389] [rocksdb] Add pre-defined option profiles. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be72758d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be72758d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be72758d Branch: refs/heads/tableOnCalcite Commit: be72758d1104400c8a48554d717c5b8cea5b3617 Parents: 82c7383 Author: Stephan Ewen Authored: Thu Feb 11 15:30:56 2016 +0100 Committer: Stephan Ewen Committed: Thu Feb 11 21:34:03 2016 +0100 -- .../contrib/streaming/state/OptionsFactory.java | 32 ++- .../streaming/state/PredefinedOptions.java | 91 .../streaming/state/RocksDBStateBackend.java| 76 ++-- 3 files changed, 190 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/be72758d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java index 73b1e5d..3e52f1f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java @@ -24,8 +24,36 @@ import org.rocksdb.Options; * A factory for {@link Options} to be passed to the {@link RocksDBStateBackend}. * Options have to be created lazily by this factory, because the {@code Options} * class is not serializable and holds pointers to native code. + * + * A typical pattern to use this OptionsFactory is as follows: + * + * Java 8: + * {@code + * rocksDbBackend.setOptions( (currentOptions) -> currentOptions.setMaxOpenFiles(1024) ); + * } + * + * Java 7: + * {@code + * rocksDbBackend.setOptions(new OptionsFactory() { + * + * public Options setOptions(Options currentOptions) { + * return currentOptions.setMaxOpenFiles(1024); + * } + * }) + * } */ public interface OptionsFactory extends java.io.Serializable { - - Options createOptions(); + + /** +* This method should set the additional options on top of the current options object. +* The current options object may contain pre-defined options based on flags that have +* been configured on the state backend. +* +* It is important to set the options on the current object and return the result from +* the setter methods, otherwise the pre-defined options may get lost. +* +* @param currentOptions The options object with the pre-defined options. +* @return The options object on which the additional options are set. +*/ + Options createOptions(Options currentOptions); } http://git-wip-us.apache.org/repos/asf/flink/blob/be72758d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java new file mode 100644 index 000..383f043 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.rocksdb.CompactionStyle; +import org.rocksdb.Options; + +/** + * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}. + * The various
[19/50] [abbrv] flink git commit: [FLINK-3350] [tests] Increase default test Akka ask and ZooKeeper timeouts
[FLINK-3350] [tests] Increase default test Akka ask and ZooKeeper timeouts Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8f40251 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8f40251 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8f40251 Branch: refs/heads/tableOnCalcite Commit: b8f40251c6c45379118254c21b0d553c2d3b8937 Parents: 9173825 Author: Ufuk Celebi Authored: Mon Feb 8 14:24:43 2016 +0100 Committer: Stephan Ewen Committed: Wed Feb 10 15:26:43 2016 +0100 -- .../runtime/minicluster/FlinkMiniCluster.scala | 20 ++-- .../minicluster/LocalFlinkMiniCluster.scala | 2 ++ .../runtime/testutils/ZooKeeperTestUtils.java | 5 +++-- .../runtime/testingUtils/TestingCluster.scala | 2 ++ .../kafka/KafkaTestEnvironmentImpl.java | 7 --- .../kafka/KafkaTestEnvironmentImpl.java | 7 --- ...ctTaskManagerProcessFailureRecoveryTest.java | 3 +++ .../JobManagerCheckpointRecoveryITCase.java | 8 ++-- .../recovery/ProcessFailureCancelingITCase.java | 2 +- 9 files changed, 43 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala -- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 4cdda3f..0346d6d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -42,7 +42,7 @@ import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, WebMonitor} import org.slf4j.LoggerFactory -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{Duration, FiniteDuration} import scala.concurrent._ import scala.concurrent.forkjoin.ForkJoinPool @@ -86,7 +86,7 @@ abstract class FlinkMiniCluster( implicit val executionContext = ExecutionContext.global - implicit val timeout = AkkaUtils.getTimeout(userConfiguration) + implicit val timeout = AkkaUtils.getTimeout(configuration) val recoveryMode = RecoveryMode.fromConfig(configuration) @@ -188,6 +188,22 @@ abstract class FlinkMiniCluster( AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort))) } + /** +* Sets CI environment (Travis) specific config defaults. +*/ + def setDefaultCiConfig(config: Configuration) : Unit = { +// https://docs.travis-ci.com/user/environment-variables#Default-Environment-Variables +if (sys.env.contains("CI")) { + // Only set if nothing specified in config + if (config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, null) == null) { +val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) * 10 +config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, s"${duration.toSeconds}s") + +LOG.info(s"Akka ask timeout set to ${duration.toSeconds}s") + } +} + } + // -- // Start/Stop Methods // -- http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala -- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 913aec0..c803429 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -48,6 +48,8 @@ class LocalFlinkMiniCluster( override def generateConfiguration(userConfiguration: Configuration): Configuration = { val config = getDefaultConfig +setDefaultCiConfig(config) + config.addAll(userConfiguration) setMemory(config) initializeIOFormatClasses(config) http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java index 6c33835..75569ec 100644 --- a/fl
[14/50] [abbrv] flink git commit: [FLINK-3366] Rename @Experimental annotation to @PublicEvolving
[FLINK-3366] Rename @Experimental annotation to @PublicEvolving This closes #1599 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/572855da Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/572855da Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/572855da Branch: refs/heads/tableOnCalcite Commit: 572855daad452eab169bc2ca27f9f1e4476df656 Parents: 59b237b Author: Fabian Hueske Authored: Mon Feb 8 14:14:01 2016 +0100 Committer: Fabian Hueske Committed: Wed Feb 10 11:51:26 2016 +0100 -- .../apache/flink/annotation/Experimental.java | 35 --- .../apache/flink/annotation/PublicEvolving.java | 40 .../flink/api/common/ExecutionConfig.java | 16 ++--- .../flink/api/common/JobExecutionResult.java| 4 +- .../functions/IterationRuntimeContext.java | 4 +- .../api/common/functions/RuntimeContext.java| 22 +++ .../util/AbstractRuntimeUDFContext.java | 12 ++-- .../common/io/statistics/BaseStatistics.java| 14 ++--- .../api/common/typeinfo/BasicArrayTypeInfo.java | 22 +++ .../api/common/typeinfo/BasicTypeInfo.java | 22 +++ .../api/common/typeinfo/NothingTypeInfo.java| 16 ++--- .../common/typeinfo/PrimitiveArrayTypeInfo.java | 24 +++ .../api/common/typeinfo/TypeInformation.java| 20 +++--- .../api/common/typeutils/CompositeType.java | 36 +-- .../flink/api/java/typeutils/AvroTypeInfo.java | 4 +- .../api/java/typeutils/EitherTypeInfo.java | 18 +++--- .../flink/api/java/typeutils/EnumTypeInfo.java | 20 +++--- .../api/java/typeutils/GenericTypeInfo.java | 20 +++--- .../api/java/typeutils/ObjectArrayTypeInfo.java | 22 +++ .../flink/api/java/typeutils/PojoTypeInfo.java | 28 - .../flink/api/java/typeutils/TupleTypeInfo.java | 16 ++--- .../flink/api/java/typeutils/TypeExtractor.java | 66 ++-- .../flink/api/java/typeutils/ValueTypeInfo.java | 24 +++ .../api/java/typeutils/WritableTypeInfo.java| 22 +++ .../java/org/apache/flink/api/java/DataSet.java | 6 +- .../flink/api/java/ExecutionEnvironment.java| 32 +- .../apache/flink/api/java/LocalEnvironment.java | 4 +- .../flink/api/java/RemoteEnvironment.java | 4 +- .../flink/api/java/functions/FirstReducer.java | 1 + .../api/java/functions/FunctionAnnotation.java | 10 +-- .../org/apache/flink/api/java/io/CsvReader.java | 4 +- .../flink/api/java/operators/CrossOperator.java | 4 +- .../flink/api/java/operators/DataSink.java | 6 +- .../flink/api/java/operators/DataSource.java| 4 +- .../api/java/operators/DeltaIteration.java | 6 +- .../api/java/operators/IterativeDataSet.java| 8 +-- .../flink/api/java/operators/JoinOperator.java | 4 +- .../api/java/operators/ProjectOperator.java | 4 +- .../flink/api/java/utils/DataSetUtils.java | 4 +- .../flink/api/java/utils/ParameterTool.java | 4 +- .../org/apache/flink/api/scala/DataSet.scala| 8 +-- .../flink/api/scala/ExecutionEnvironment.scala | 34 +- .../api/scala/typeutils/CaseClassTypeInfo.scala | 16 ++--- .../api/scala/typeutils/EitherTypeInfo.scala| 18 +++--- .../api/scala/typeutils/EnumValueTypeInfo.scala | 20 +++--- .../api/scala/typeutils/OptionTypeInfo.scala| 18 +++--- .../scala/typeutils/ScalaNothingTypeInfo.scala | 16 ++--- .../scala/typeutils/TraversableTypeInfo.scala | 18 +++--- .../flink/api/scala/typeutils/TryTypeInfo.scala | 18 +++--- .../api/scala/typeutils/UnitTypeInfo.scala | 16 ++--- .../apache/flink/api/scala/utils/package.scala | 5 +- .../api/datastream/AllWindowedStream.java | 8 +-- .../api/datastream/CoGroupedStreams.java| 8 +-- .../api/datastream/ConnectedStreams.java| 4 +- .../streaming/api/datastream/DataStream.java| 46 +++--- .../api/datastream/DataStreamSink.java | 6 +- .../api/datastream/IterativeStream.java | 4 +- .../streaming/api/datastream/JoinedStreams.java | 10 +-- .../streaming/api/datastream/KeyedStream.java | 6 +- .../datastream/SingleOutputStreamOperator.java | 20 +++--- .../streaming/api/datastream/SplitStream.java | 4 +- .../api/datastream/WindowedStream.java | 8 +-- .../api/environment/CheckpointConfig.java | 6 +- .../environment/StreamExecutionEnvironment.java | 36 +-- .../source/EventTimeSourceFunction.java | 4 +- .../api/functions/source/SourceFunction.java| 6 +- .../streaming/api/scala/AllWindowedStream.scala | 6 +- .../streaming/api/scala/CoGroupedStreams.scala | 10 +-- .../flink/streaming/api/scala/DataStream.scala | 57 - .../streaming/api/scala/JoinedStreams.scala | 8 +-- .../flink/streaming/api/scala/KeyedStream.scala | 4 +- .../api/scala/StreamExecutionEnvironment.scala | 28 - .../streaming/api/scala/Windo
[42/50] [abbrv] flink git commit: [FLINK-3225] Implemented optimization of Table API queries via Calcite
[FLINK-3225] Implemented optimization of Table API queries via Calcite - added logical Flink nodes and translation rules - added stubs for DataSet translation rules - ported DataSetNodes to Scala - reactivated tests and added expected NotImplementedError Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe5e4065 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe5e4065 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe5e4065 Branch: refs/heads/tableOnCalcite Commit: fe5e4065643dcb208f3990897f171780311502e9 Parents: 20235e0 Author: Fabian Hueske Authored: Tue Jan 26 13:22:38 2016 +0100 Committer: Fabian Hueske Committed: Fri Feb 12 11:34:09 2016 +0100 -- .../apache/flink/api/table/package-info.java| 33 - .../api/table/sql/calcite/DataSetRelNode.java | 29 - .../table/sql/calcite/node/DataSetExchange.java | 60 -- .../table/sql/calcite/node/DataSetFlatMap.java | 56 - .../api/table/sql/calcite/node/DataSetJoin.java | 80 - .../api/table/sql/calcite/node/DataSetMap.java | 58 - .../table/sql/calcite/node/DataSetReduce.java | 58 - .../sql/calcite/node/DataSetReduceGroup.java| 62 -- .../api/table/sql/calcite/node/DataSetSort.java | 59 - .../table/sql/calcite/node/DataSetSource.java | 55 - .../table/sql/calcite/node/DataSetUnion.java| 51 .../api/java/table/JavaBatchTranslator.scala| 68 +++ .../api/table/plan/TranslationContext.scala | 79 .../plan/nodes/dataset/DataSetConvention.scala | 42 +++ .../plan/nodes/dataset/DataSetExchange.scala| 63 ++ .../plan/nodes/dataset/DataSetFlatMap.scala | 62 ++ .../plan/nodes/dataset/DataSetGroupReduce.scala | 63 ++ .../table/plan/nodes/dataset/DataSetJoin.scala | 73 +++ .../table/plan/nodes/dataset/DataSetMap.scala | 63 ++ .../plan/nodes/dataset/DataSetReduce.scala | 63 ++ .../table/plan/nodes/dataset/DataSetRel.scala | 33 + .../table/plan/nodes/dataset/DataSetSort.scala | 62 ++ .../plan/nodes/dataset/DataSetSource.scala | 55 + .../table/plan/nodes/dataset/DataSetUnion.scala | 62 ++ .../plan/nodes/logical/FlinkAggregate.scala | 76 .../table/plan/nodes/logical/FlinkCalc.scala| 37 ++ .../plan/nodes/logical/FlinkConvention.scala| 42 +++ .../table/plan/nodes/logical/FlinkFilter.scala | 42 +++ .../table/plan/nodes/logical/FlinkJoin.scala| 46 +++ .../table/plan/nodes/logical/FlinkProject.scala | 45 +++ .../api/table/plan/nodes/logical/FlinkRel.scala | 25 .../table/plan/nodes/logical/FlinkScan.scala| 31 + .../table/plan/nodes/logical/FlinkUnion.scala | 38 ++ .../api/table/plan/operators/DataSetTable.scala | 66 -- .../api/table/plan/rules/FlinkRuleSets.scala| 120 +++ .../rules/dataset/DataSetAggregateRule.scala| 53 .../plan/rules/dataset/DataSetCalcRule.scala| 52 .../plan/rules/dataset/DataSetFilterRule.scala | 52 .../plan/rules/dataset/DataSetJoinRule.scala| 59 + .../plan/rules/dataset/DataSetProjectRule.scala | 52 .../plan/rules/dataset/DataSetScanRule.scala| 53 .../plan/rules/dataset/DataSetUnionRule.scala | 53 .../plan/rules/logical/FlinkAggregateRule.scala | 53 .../plan/rules/logical/FlinkCalcRule.scala | 50 .../plan/rules/logical/FlinkFilterRule.scala| 50 .../plan/rules/logical/FlinkJoinRule.scala | 54 + .../plan/rules/logical/FlinkProjectRule.scala | 51 .../plan/rules/logical/FlinkScanRule.scala | 53 .../plan/rules/logical/FlinkUnionRule.scala | 54 + .../api/table/plan/schema/DataSetTable.scala| 89 ++ .../org/apache/flink/api/table/table.scala | 1 - .../api/java/table/test/AggregationsITCase.java | 72 +-- .../flink/api/java/table/test/AsITCase.java | 62 +- .../api/java/table/test/CastingITCase.java | 53 .../api/java/table/test/ExpressionsITCase.java | 32 ++--- .../flink/api/java/table/test/FilterITCase.java | 69 +-- .../table/test/GroupedAggregationsITCase.java | 40 +++ .../flink/api/java/table/test/JoinITCase.java | 70 ++- .../api/java/table/test/PojoGroupingITCase.java | 19 +-- .../flink/api/java/table/test/SelectITCase.java | 68 +-- .../table/test/StringExpressionsITCase.java | 40 --- .../flink/api/java/table/test/UnionITCase.java | 47 .../scala/table/test/AggregationsITCase.scala | 59 + .../flink/api/scala/table/test/AsITCase.scala | 50 .../api/scala/table/test/CastingITCase.scal
[25/50] [abbrv] flink git commit: [FLINK-3371] [api-breaking] Move TriggerResult and TriggerContext to dedicated classes
[FLINK-3371] [api-breaking] Move TriggerResult and TriggerContext to dedicated classes This closes #1603 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50bd65a5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50bd65a5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50bd65a5 Branch: refs/heads/tableOnCalcite Commit: 50bd65a574776817a03dd32fd438cb2327447109 Parents: 8df0bba Author: Stephan Ewen Authored: Sun Feb 7 21:46:16 2016 +0100 Committer: Stephan Ewen Committed: Wed Feb 10 22:15:31 2016 +0100 -- .../examples/windowing/SessionWindowing.java| 3 +- .../api/windowing/assigners/GlobalWindows.java | 10 +- .../triggers/ContinuousEventTimeTrigger.java| 7 +- .../ContinuousProcessingTimeTrigger.java| 2 +- .../api/windowing/triggers/CountTrigger.java| 2 +- .../api/windowing/triggers/DeltaTrigger.java| 7 +- .../windowing/triggers/EventTimeTrigger.java| 5 +- .../triggers/ProcessingTimeTrigger.java | 5 +- .../api/windowing/triggers/PurgingTrigger.java | 4 +- .../api/windowing/triggers/Trigger.java | 102 +-- .../api/windowing/triggers/TriggerResult.java | 96 + .../windowing/EvictingWindowOperator.java | 5 +- .../windowing/NonKeyedWindowOperator.java | 34 --- .../operators/windowing/WindowOperator.java | 18 ++-- 14 files changed, 179 insertions(+), 121 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java -- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index bd82800..e2df160 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import java.util.ArrayList; @@ -95,7 +96,7 @@ public class SessionWindowing { env.execute(); } - private static class SessionTrigger implements Trigger, GlobalWindow> { + private static class SessionTrigger extends Trigger, GlobalWindow> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java index d3eb2ac..a4d92cf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import java.util.Collection; @@ -67,15 +68,12 @@ public class GlobalWindows extends WindowAssigner { /** * A trigger that never fires, as default Trigger for GlobalWindows. */ - private static class NeverTrigger implements Trigger { + private static class NeverTrigger extends Trigger { private static final long serialVersionUID = 1L; @Override - public TriggerResult onElement(Object element, - long timestamp, - GlobalWindow window, -
[03/50] [abbrv] flink git commit: [FLINK-3357] [core] Drop AbstractID#toShortString()
[FLINK-3357] [core] Drop AbstractID#toShortString() This closes #1601 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28feede7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28feede7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28feede7 Branch: refs/heads/tableOnCalcite Commit: 28feede7d40dc73ec861cf93393650b8b10afc3a Parents: 5c47f38 Author: Ufuk Celebi Authored: Mon Feb 8 16:05:27 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 8 20:18:20 2016 +0100 -- .../streaming/state/RocksDBStateBackend.java| 4 +-- .../contrib/streaming/state/DbStateBackend.java | 35 ++-- .../streaming/state/DbStateBackendTest.java | 10 +++--- .../java/org/apache/flink/util/AbstractID.java | 15 - ...taskExecutionAttemptAccumulatorsHandler.java | 2 +- .../InputGateDeploymentDescriptor.java | 2 +- .../io/network/partition/ResultPartitionID.java | 2 +- 7 files changed, 28 insertions(+), 42 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index aaaeea4..eefa4a9 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -81,11 +81,11 @@ public class RocksDBStateBackend extends AbstractStateBackend { } private File getDbPath(String stateName) { - return new File(new File(new File(new File(dbBasePath), jobId.toShortString()), operatorIdentifier), stateName); + return new File(new File(new File(new File(dbBasePath), jobId.toString()), operatorIdentifier), stateName); } private String getCheckpointPath(String stateName) { - return checkpointDirectory + "/" + jobId.toShortString() + "/" + operatorIdentifier + "/" + stateName; + return checkpointDirectory + "/" + jobId.toString() + "/" + operatorIdentifier + "/" + stateName; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java -- diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java index 1d1ccd7..5162983 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java @@ -17,14 +17,6 @@ package org.apache.flink.contrib.streaming.state; -import java.io.Serializable; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Random; -import java.util.concurrent.Callable; - import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; @@ -42,6 +34,14 @@ import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.Callable; + import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry; /** @@ -76,6 +76,8 @@ public class DbStateBackend extends AbstractStateBackend { private transient Environment env; + private transient String appId; + // -- private final DbBackendConfig dbConfig; @@ -159,19 +161,14 @@ public class DbStateBackend extends AbstractStateBackend { // store the checkpoint id and timestamp for bookkeeping long handleId = rnd.nextLong(); - // We use the ApplicationID here, because
[40/50] [abbrv] flink git commit: [FLINK-3225] Implemented optimization of Table API queries via Calcite
http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java -- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java index da82b8a..a3d31da 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.api.java.table.test; -import org.apache.flink.api.table.ExpressionException; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.Row; import org.apache.flink.api.java.DataSet; @@ -30,6 +29,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import scala.NotImplementedError; import java.util.List; @@ -41,7 +41,7 @@ public class SelectITCase extends MultipleProgramsTestBase { super(mode); } - @Test + @Test(expected = NotImplementedError.class) public void testSimpleSelectAllWithAs() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -53,19 +53,19 @@ public class SelectITCase extends MultipleProgramsTestBase { Table result = in .select("a, b, c"); -// DataSet resultSet = tableEnv.toDataSet(result, Row.class); -// List results = resultSet.collect(); -// String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + -// "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + -// "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + -// "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + -// "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + -// "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; -// compareResultAsText(results, expected); + DataSet resultSet = tableEnv.toDataSet(result, Row.class); + List results = resultSet.collect(); + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + compareResultAsText(results, expected); } - @Test + @Test(expected = NotImplementedError.class) public void testSimpleSelectWithNaming() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -78,12 +78,12 @@ public class SelectITCase extends MultipleProgramsTestBase { .select("f0 as a, f1 as b") .select("a, b"); -// DataSet resultSet = tableEnv.toDataSet(result, Row.class); -// List results = resultSet.collect(); -// String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + -// "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + -// "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; -// compareResultAsText(results, expected); + DataSet resultSet = tableEnv.toDataSet(result, Row.class); + List results = resultSet.collect(); + String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; + compareResultAsText(results, expected); } @Tes
[11/50] [abbrv] flink git commit: [build system] Skip javadoc generation for java8 and snapshot deployments
[build system] Skip javadoc generation for java8 and snapshot deployments Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59b237b5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59b237b5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59b237b5 Branch: refs/heads/tableOnCalcite Commit: 59b237b5d2169d50f886af64df880cb614408890 Parents: de231d7 Author: Robert Metzger Authored: Tue Feb 9 23:50:45 2016 +0100 Committer: Robert Metzger Committed: Tue Feb 9 23:50:45 2016 +0100 -- .travis.yml | 4 ++-- tools/deploy_to_maven.sh | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/59b237b5/.travis.yml -- diff --git a/.travis.yml b/.travis.yml index 50b5fab..de3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,9 +17,9 @@ language: java matrix: include: - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.7.1 -Dscala-2.11 -Pinclude-yarn-tests" + env: PROFILE="-Dhadoop.version=2.7.1 -Dscala-2.11 -Pinclude-yarn-tests -Dmaven.javadoc.skip=true" - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.5.0 -Pinclude-yarn-tests" + env: PROFILE="-Dhadoop.version=2.5.0 -Pinclude-yarn-tests -Dmaven.javadoc.skip=true" - jdk: "openjdk7" env: PROFILE="-Dhadoop.version=2.4.0 -Dscala-2.11 -Pinclude-yarn-tests" - jdk: "oraclejdk7" # this uploads the Hadoop 2 build to Maven and S3 http://git-wip-us.apache.org/repos/asf/flink/blob/59b237b5/tools/deploy_to_maven.sh -- diff --git a/tools/deploy_to_maven.sh b/tools/deploy_to_maven.sh index 6dcc806..8ace5aa 100755 --- a/tools/deploy_to_maven.sh +++ b/tools/deploy_to_maven.sh @@ -99,7 +99,7 @@ if [[ $TRAVIS_PULL_REQUEST == "false" ]] && [[ $TRAVIS_REPO_SLUG == "apache/flin # Deploy hadoop v1 to maven echo "Generating poms for hadoop1" ./tools/generate_specific_pom.sh $CURRENT_FLINK_VERSION $CURRENT_FLINK_VERSION_HADOOP1 pom.hadoop1.xml - mvn -B -f pom.hadoop1.xml -Pdocs-and-source -DskipTests -Drat.ignoreErrors=true deploy --settings deploysettings.xml; + mvn -B -f pom.hadoop1.xml -DskipTests -Drat.ignoreErrors=true deploy --settings deploysettings.xml; # deploy to s3 deploy_to_s3 $CURRENT_FLINK_VERSION "hadoop1" @@ -116,13 +116,13 @@ if [[ $TRAVIS_PULL_REQUEST == "false" ]] && [[ $TRAVIS_REPO_SLUG == "apache/flin # deploy hadoop v2 (yarn) echo "deploy standard version (hadoop2) for scala 2.10 from flink2 directory" # do the hadoop2 scala 2.10 in the background - (mvn -B -DskipTests -Pdocs-and-source -Drat.skip=true -Drat.ignoreErrors=true clean deploy --settings deploysettings.xml; deploy_to_s3 $CURRENT_FLINK_VERSION "hadoop2" ) & + (mvn -B -DskipTests -Drat.skip=true -Drat.ignoreErrors=true clean deploy --settings deploysettings.xml; deploy_to_s3 $CURRENT_FLINK_VERSION "hadoop2" ) & # switch back to the regular flink directory cd ../flink echo "deploy hadoop2 version (standard) for scala 2.11 from flink directory" ./tools/change-scala-version.sh 2.11 - mvn -B -DskipTests -Pdocs-and-source -Drat.skip=true -Drat.ignoreErrors=true clean deploy --settings deploysettings.xml; + mvn -B -DskipTests -Drat.skip=true -Drat.ignoreErrors=true clean deploy --settings deploysettings.xml; deploy_to_s3 $CURRENT_FLINK_VERSION "hadoop2_2.11"
[20/50] [abbrv] flink git commit: [hotfix] [tests] Ignore ZooKeeper logs in process tests
[hotfix] [tests] Ignore ZooKeeper logs in process tests Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a643c07 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a643c07 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a643c07 Branch: refs/heads/tableOnCalcite Commit: 3a643c07792c62142c1f8cda172d4f4c3442c9b3 Parents: b8f4025 Author: Ufuk Celebi Authored: Tue Feb 9 11:01:39 2016 +0100 Committer: Stephan Ewen Committed: Wed Feb 10 15:27:16 2016 +0100 -- .../JobManagerSubmittedJobGraphsRecoveryITCase.java | 6 +- .../org/apache/flink/runtime/testutils/CommonTestUtils.java | 1 + .../AbstractJobManagerProcessFailureRecoveryITCase.java | 8 +--- .../org/apache/flink/test/recovery/ChaosMonkeyITCase.java| 4 .../test/recovery/JobManagerCheckpointRecoveryITCase.java| 8 5 files changed, 23 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3a643c07/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java index 99f7bd7..59c7c39 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java @@ -343,6 +343,10 @@ public class JobManagerSubmittedJobGraphsRecoveryITCase extends TestLogger { assertEquals(2, jobSubmitSuccessMessages); } catch (Throwable t) { + // Print early (in some situations the process logs get too big + // for Travis and the root problem is not shown) + t.printStackTrace(); + // In case of an error, print the job manager process logs. if (jobManagerProcess[0] != null) { jobManagerProcess[0].printProcessLog(); @@ -352,7 +356,7 @@ public class JobManagerSubmittedJobGraphsRecoveryITCase extends TestLogger { jobManagerProcess[1].printProcessLog(); } - t.printStackTrace(); + throw t; } finally { if (jobManagerProcess[0] != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/3a643c07/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java index 069b6af..bbb6a89 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java @@ -147,6 +147,7 @@ public class CommonTestUtils { writer.println("log4j.appender.console.layout=org.apache.log4j.PatternLayout"); writer.println("log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n"); writer.println("log4j.logger.org.eclipse.jetty.util.log=OFF"); + writer.println("log4j.logger.org.apache.zookeeper=OFF"); writer.flush(); writer.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/3a643c07/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java index 2f6b762..6122352 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java @@ -246,8 +246,10 @@ public abstract class AbstractJobManagerProcessFailureRecoveryITCase extends Tes fail("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage());
[09/50] [abbrv] flink git commit: [hotfix] Add missing entries to condif reference doc and removed outdated webclient entries
[hotfix] Add missing entries to condif reference doc and removed outdated webclient entries Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72f8228a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72f8228a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72f8228a Branch: refs/heads/tableOnCalcite Commit: 72f8228ada76898c0bf168e5db8788dc35f7f4ed Parents: c57a7e9 Author: Stephan Ewen Authored: Tue Feb 9 16:01:05 2016 +0100 Committer: Stephan Ewen Committed: Tue Feb 9 16:01:05 2016 +0100 -- docs/setup/config.md | 33 ++--- 1 file changed, 18 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/72f8228a/docs/setup/config.md -- diff --git a/docs/setup/config.md b/docs/setup/config.md index 0eb3acb..343a856 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -39,6 +39,8 @@ The configuration files for the TaskManagers can be different, Flink does not as - `env.java.home`: The path to the Java installation to use (DEFAULT: system's default Java installation, if found). Needs to be specified if the startup scripts fail to automatically resolve the java home directory. Can be specified to point to a specific java installation or version. If this option is not specified, the startup scripts also evaluate the `$JAVA_HOME` environment variable. +- `env.java.opts`: Set custom JVM options. This value is respected by Flink's start scripts and Flink's YARN client. This can be used to set different garbage collectors or to include remote debuggers into the JVMs running Flink's services. + - `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). - `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: 6123). @@ -72,6 +74,14 @@ The default fraction for managed memory can be adjusted using the `taskmanager.m - `taskmanager.memory.preallocate`: Can be either of `true` or `false`. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false) +### Memory and Performance Debugging + +These options are useful for debugging a Flink application for memory and garbage collection related isues, such as performance and out-of-memory process kills or exceptions. + +- `taskmanager.debug.memory.startLogThread`: Causes the TaskManagers to periodically log memory and Garbage collection statistics. The statistics include current heap-, off-heap, and other memory pool utilization, as well as the time spent on garbage collection, by heap memory pool. + +- `taskmanager.debug.memory.logIntervalMs`: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, if `taskmanager.debug.memory.startLogThread` is set to true. + ### Kerberos Flink supports Kerberos authentication of Hadoop services such as HDFS, YARN, or HBase. @@ -98,8 +108,6 @@ If you are on YARN, then it is sufficient to authenticate the client with Kerber - `taskmanager.network.numberOfBuffers`: The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: 2048). -- `env.java.opts`: Set custom JVM options. This value is respected by Flink's start scripts and Flink's YARN client. This can be used to set different garbage collectors or to include remote debuggers into the JVMs running Flink's services. - - `state.backend`: The backend that will be used to store operator state checkpoints if checkpointing is enabled. Supported backends: - `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging. - `filesystem`: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, ... @@ -130,6 +138,7 @@ The following parameters configure Flink's JobManager and TaskManagers. - `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). - `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: 6123). +- `taskmanager.hostname`: The hostname of the network interface that the TaskManager binds to. By default, the TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers. This op
[15/50] [abbrv] flink git commit: [FLINK-3234] [dataSet] Add KeySelector support to sortPartition operation.
[FLINK-3234] [dataSet] Add KeySelector support to sortPartition operation. This closes #1585 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a63797a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a63797a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a63797a Branch: refs/heads/tableOnCalcite Commit: 0a63797a6a5418b2363bca25bd77c33c217ff257 Parents: 572855d Author: Chiwan Park Authored: Thu Feb 4 20:46:10 2016 +0900 Committer: Fabian Hueske Committed: Wed Feb 10 11:51:26 2016 +0100 -- .../java/org/apache/flink/api/java/DataSet.java | 18 ++ .../java/operators/SortPartitionOperator.java | 174 +-- .../api/java/operator/SortPartitionTest.java| 82 + .../org/apache/flink/api/scala/DataSet.scala| 25 +++ .../api/scala/PartitionSortedDataSet.scala | 22 ++- .../javaApiOperators/SortPartitionITCase.java | 61 +++ .../scala/operators/SortPartitionITCase.scala | 59 +++ 7 files changed, 385 insertions(+), 56 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/0a63797a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index bfb97f4..c315920 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -1381,6 +1381,24 @@ public abstract class DataSet { return new SortPartitionOperator<>(this, field, order, Utils.getCallLocationName()); } + /** +* Locally sorts the partitions of the DataSet on the extracted key in the specified order. +* The DataSet can be sorted on multiple values by returning a tuple from the KeySelector. +* +* Note that no additional sort keys can be appended to a KeySelector sort keys. To sort +* the partitions by multiple values using KeySelector, the KeySelector must return a tuple +* consisting of the values. +* +* @param keyExtractor The KeySelector function which extracts the key values from the DataSet +* on which the DataSet is sorted. +* @param order The order in which the DataSet is sorted. +* @return The DataSet with sorted local partitions. +*/ + public SortPartitionOperator sortPartition(KeySelector keyExtractor, Order order) { + final TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); + return new SortPartitionOperator<>(this, new Keys.SelectorFunctionKeys<>(clean(keyExtractor), getType(), keyType), order, Utils.getCallLocationName()); + } + // // Top-K // http://git-wip-us.apache.org/repos/asf/flink/blob/0a63797a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java index 354a0cd..7f30a30 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java @@ -26,9 +26,13 @@ import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; /** * This operator represents a DataSet with locally sorted partitions. @@ -38,27 +42,58 @@ import java.util.Arrays; @Public public class SortPartitionOperator extends SingleInputOperator> { - private int[] sortKeyPositions; + private List> keys; - private Order[] sortOrders; + private List orders; private final String sortLocationName; + private boolean useKeySelector; - public SortPartitionOperator(DataSet dataSet, int sortField, Order so
[43/50] [abbrv] flink git commit: [FLINK-3226] Implement a CodeGenerator for an efficient translation to DataSet programs
http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala deleted file mode 100644 index 4927fb2..000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.typeinfo - -import java.util - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder, -FlatFieldDescriptor} -import org.apache.flink.api.common.typeutils.{CompositeType, TypeSerializer} - -/** - * A TypeInformation that is used to rename fields of an underlying CompositeType. This - * allows the system to translate "as" Table API operations to a [[RenameOperator]] - * that does not get translated to a runtime operator. - */ -class RenamingProxyTypeInfo[T]( -val tpe: CompositeType[T], -val fieldNames: Array[String]) - extends CompositeType[T](tpe.getTypeClass) { - - def getUnderlyingType: CompositeType[T] = tpe - - if (tpe.getArity != fieldNames.length) { -throw new IllegalArgumentException(s"Number of field names '${fieldNames.mkString(",")}' and " + - s"number of fields in underlying type $tpe do not match.") - } - - if (fieldNames.toSet.size != fieldNames.length) { -throw new IllegalArgumentException(s"New field names must be unique. " + - s"Names: ${fieldNames.mkString(",")}.") - } - - override def getFieldIndex(fieldName: String): Int = { -val result = fieldNames.indexOf(fieldName) -if (result != fieldNames.lastIndexOf(fieldName)) { - -2 -} else { - result -} - } - override def getFieldNames: Array[String] = fieldNames - - override def isBasicType: Boolean = tpe.isBasicType - - override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = -tpe.createSerializer(executionConfig) - - override def getArity: Int = tpe.getArity - - override def isKeyType: Boolean = tpe.isKeyType - - override def getTypeClass: Class[T] = tpe.getTypeClass - - override def getGenericParameters: java.util.List[TypeInformation[_]] = tpe.getGenericParameters - - override def isTupleType: Boolean = tpe.isTupleType - - override def toString = { -s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " + - s"fields: ${fieldNames.mkString(", ")})" - } - - override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos) - - override def getTotalFields: Int = tpe.getTotalFields - - override def createComparator( -logicalKeyFields: Array[Int], -orders: Array[Boolean], -logicalFieldOffset: Int, -executionConfig: ExecutionConfig) = -tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, executionConfig) - - override def getFlatFields( - fieldExpression: String, - offset: Int, - result: util.List[FlatFieldDescriptor]): Unit = { - -// split of head of field expression -val (head, tail) = if (fieldExpression.indexOf('.') >= 0) { - fieldExpression.splitAt(fieldExpression.indexOf('.')) -} else { - (fieldExpression, "") -} - -// replace proxy field name by original field name of wrapped type -val headPos = getFieldIndex(head) -if (headPos >= 0) { - val resolvedHead = tpe.getFieldNames()(headPos) - val resolvedFieldExpr = resolvedHead + tail - - // get flat fields with wrapped field name - tpe.getFlatFields(resolvedFieldExpr, offset, result) -} -else { - throw new IllegalArgumentException(s"Invalid field expression: ${fieldExpression}") -} - } - - override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = { -tpe.getTypeAt(fieldExpr
[35/50] [abbrv] flink git commit: [FLINK-3223] Translate Table API calls to Calcite RelNodes.
http://git-wip-us.apache.org/repos/asf/flink/blob/ed6cc91e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala deleted file mode 100644 index 07acf1e..000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.expressions.analysis - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.table.expressions.{Expression, Aggregation} - -import scala.collection.mutable - -import org.apache.flink.api.table.trees.Rule - -/** - * Rule that verifies that an expression does not contain aggregate operations - * as children of aggregate operations. - */ -class VerifyNoNestedAggregates extends Rule[Expression] { - - def apply(expr: Expression) = { -val errors = mutable.MutableList[String]() - -val result = expr.transformPre { - case agg: Aggregation=> { -if (agg.child.exists(_.isInstanceOf[Aggregation])) { - errors += s"""Found nested aggregation inside "$agg".""" -} -agg - } -} - -if (errors.length > 0) { - throw new ExpressionException( -s"""Invalid expression "$expr": ${errors.mkString(" ")}""") -} - -result - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/ed6cc91e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala index e866ea0..c23fa03 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala @@ -17,8 +17,8 @@ */ package org.apache.flink.api.table.expressions +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo} import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, NumericTypeInfo, TypeInformation} abstract class BinaryArithmetic extends BinaryExpression { self: Product => def typeInfo = { @@ -83,7 +83,7 @@ case class Mul(left: Expression, right: Expression) extends BinaryArithmetic { } case class Mod(left: Expression, right: Expression) extends BinaryArithmetic { - override def toString = s"($left * $right)" + override def toString = s"($left % $right)" } case class Abs(child: Expression) extends UnaryExpression { @@ -91,55 +91,3 @@ case class Abs(child: Expression) extends UnaryExpression { override def toString = s"abs($child)" } - -abstract class BitwiseBinaryArithmetic extends BinaryExpression { self: Product => - def typeInfo: TypeInformation[_] = { -if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( -s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""") -} -if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( -s"""Non-integer operand "${right}" of type ${right.typeInfo} in $this""") -} -if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + -s"${right.typeInfo} in $this") -} -if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { - left.typeInfo -} else { - BasicTypeInfo.INT_TYPE_INFO -} - } -} - -case class BitwiseAnd(left: Expression, right: Expression) extends BitwiseBinaryArit
[08/50] [abbrv] flink git commit: [FLINK-3373] [build] Bump version of transitine HTTP Components dependency to 4.4.4 (core) / 4.5.1 (client)
[FLINK-3373] [build] Bump version of transitine HTTP Components dependency to 4.4.4 (core) / 4.5.1 (client) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c57a7e91 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c57a7e91 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c57a7e91 Branch: refs/heads/tableOnCalcite Commit: c57a7e910e1dcb45c2c3f7b7743a1c8b37ce7639 Parents: a4f0692 Author: Stephan Ewen Authored: Tue Feb 9 14:58:09 2016 +0100 Committer: Stephan Ewen Committed: Tue Feb 9 15:00:51 2016 +0100 -- pom.xml | 11 ++- 1 file changed, 2 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c57a7e91/pom.xml -- diff --git a/pom.xml b/pom.xml index 1f1a772..81528c8 100644 --- a/pom.xml +++ b/pom.xml @@ -240,13 +240,6 @@ under the License. 1.7 - - - stax - stax-api - 1.0.1 - - com.esotericsoftware.kryo @@ -351,13 +344,13 @@ under the License. org.apache.httpcomponents httpcore - 4.2.5 + 4.4.4 org.apache.httpcomponents httpclient - 4.2.6 + 4.5.1
[07/50] [abbrv] flink git commit: [FLINK-3286] Remove the files from the debian packaging as well
[FLINK-3286] Remove the files from the debian packaging as well Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4f0692e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4f0692e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4f0692e Branch: refs/heads/tableOnCalcite Commit: a4f0692e9c15b3e30c25716eca2d83e7140d687e Parents: 9ee1679 Author: Robert Metzger Authored: Tue Feb 9 10:10:56 2016 +0100 Committer: Robert Metzger Committed: Tue Feb 9 12:09:29 2016 +0100 -- flink-dist/src/deb/bin/jobmanager | 236 --- flink-dist/src/deb/bin/taskmanager | 236 --- flink-dist/src/deb/bin/webclient| 236 --- flink-dist/src/deb/control/control | 26 flink-dist/src/deb/control/postinst | 75 -- 5 files changed, 809 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a4f0692e/flink-dist/src/deb/bin/jobmanager -- diff --git a/flink-dist/src/deb/bin/jobmanager b/flink-dist/src/deb/bin/jobmanager deleted file mode 100755 index 1ec0172..000 --- a/flink-dist/src/deb/bin/jobmanager +++ /dev/null @@ -1,236 +0,0 @@ -#! /bin/sh - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -# -# skeleton example file to build /etc/init.d/ scripts. -#This file should be used to construct scripts for /etc/init.d. -# -#Written by Miquel van Smoorenburg . -#Modified for Debian -#by Ian Murdock . -# Further changes by Javier Fernandez-Sanguino -# -# Version: @(#)skeleton 1.9 26-Feb-2001 miqu...@cistron.nl -# -# Starts a flink jobmanager -# -# chkconfig: 2345 85 15 -# description: flink jobmanager -# -### BEGIN INIT INFO -# Provides: flink-jobmanager -# Required-Start:$network $local_fs -# Required-Stop: -# Should-Start: $named -# Should-Stop: -# Default-Start: 2 3 4 5 -# Default-Stop: 0 1 6 -# Short-Description: flink jobmanager daemon -### END INIT INFO - -# Include flink defaults if available -if [ -f /etc/default/flink ] ; then - . /etc/default/flink -fi - - -if [ "$FLINK_PID_DIR" = "" ]; then - FLINK_PID_DIR=/tmp -fi - -if [ "$FLINK_IDENT_STRING" = "" ]; then - FLINK_IDENT_STRING="$USER" -fi - -FLINK_HOME=/usr/share/flink-dist -PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin -DAEMON_SCRIPT=$FLINK_HOME/bin/jobmanager.sh -NAME=flink-jobmanager -DESC="flink jobmanager daemon" -PID_FILE=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-jobmanager.pid - -test -x $DAEMON_SCRIPT || exit 1 - - -DODTIME=5 # Time to wait for the server to die, in seconds -# If this value is set too low you might not -# let some servers to die gracefully and -# 'restart' will not work - -# Checks if the given pid represents a live process. -# Returns 0 if the pid is a live process, 1 otherwise -flink_is_process_alive() { - local pid="$1" - ps -fp $pid | grep $pid | grep jobmanager > /dev/null 2>&1 -} - -# Check if the process associated to a pidfile is running. -# Return 0 if the pidfile exists and the process is running, 1 otherwise -flink_check_pidfile() { - local pidfile="$1" # IN - local pid - - pid=`cat "$pidfile" 2>/dev/null` - if [ "$pid" = '' ]; then -# The file probably does not exist or is empty. -return 1 - fi - - set -- $pid - pid="$1" - - flink_is_process_alive $pid -} - -flink_process_kill() { - local pid="$1"# IN - local signal="$2" # IN - local second - - kill -$signal $pid 2>/dev/null - - # Wait a bit to see if the dirty job has really been done - for second in 0 1 2 3 4 5 6 7 8 9 10; do - if flink_is_process_alive "$pid"; then - # Success - return 0 - fi - - sleep 1 - done -
[27/50] [abbrv] flink git commit: [FLINK-3369] [runtime] Make RemoteTransportException instance of CancelTaskException
[FLINK-3369] [runtime] Make RemoteTransportException instance of CancelTaskException Problem: RemoteTransportException (RTE) is thrown on data transfer failures when the remote data producer fails. Because RTE is an instance of IOException, it can happen that the RTE is reported as the root job failure cause. Solution: Make RTE instance of CancelTaskException, leading to cancellation of the task and not failure. Squashes the following commit: [pr-comments] Add remote address to RemoteTransportException This closes #1621. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf3ae88b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf3ae88b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf3ae88b Branch: refs/heads/tableOnCalcite Commit: cf3ae88b73e30a2d69ac1cc6009a8304ea3f53cc Parents: fd324ea Author: Ufuk Celebi Authored: Wed Feb 10 19:51:20 2016 +0100 Committer: Ufuk Celebi Committed: Thu Feb 11 14:39:40 2016 +0100 -- .../runtime/execution/CancelTaskException.java | 4 ++ .../network/netty/PartitionRequestClient.java | 6 +-- .../netty/PartitionRequestClientFactory.java| 4 +- .../netty/PartitionRequestClientHandler.java| 17 +++- .../exception/LocalTransportException.java | 22 +++--- .../exception/RemoteTransportException.java | 36 +--- .../netty/exception/TransportException.java | 43 7 files changed, 62 insertions(+), 70 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java index 3bcbe2e..ebf58ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java @@ -34,6 +34,10 @@ public class CancelTaskException extends RuntimeException { super(msg); } + public CancelTaskException(String msg, Throwable cause) { + super(msg, cause); + } + public CancelTaskException() { super(); } http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java index f6120d4..fb24a8e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java @@ -114,7 +114,7 @@ public class PartitionRequestClient { inputChannel.onError( new LocalTransportException( "Sending the partition request failed.", - future.channel().localAddress(), future.cause() + future.cause() )); } } @@ -158,7 +158,7 @@ public class PartitionRequestClient { if (!future.isSuccess()) { inputChannel.onError(new LocalTransportException( "Sending the task event failed.", - future.channel().localAddress(), future.cause() + future.cause() )); } } @@ -185,7 +185,7 @@ public class PartitionRequestClient { private void checkNotClosed() throws IOException { if (closeReferenceCounter.isDisposed()) { - throw new LocalTransportEx
[34/50] [abbrv] flink git commit: [FLINK-3223] Translate Table API calls to Calcite RelNodes.
http://git-wip-us.apache.org/repos/asf/flink/blob/ed6cc91e/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java -- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java index 9e42f53..da82b8a 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java @@ -53,15 +53,15 @@ public class SelectITCase extends MultipleProgramsTestBase { Table result = in .select("a, b, c"); - DataSet resultSet = tableEnv.toDataSet(result, Row.class); - List results = resultSet.collect(); - String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + - "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + - "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + - "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + - "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + - "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; - compareResultAsText(results, expected); +// DataSet resultSet = tableEnv.toDataSet(result, Row.class); +// List results = resultSet.collect(); +// String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + +// "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + +// "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + +// "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + +// "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + +// "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; +// compareResultAsText(results, expected); } @@ -78,15 +78,15 @@ public class SelectITCase extends MultipleProgramsTestBase { .select("f0 as a, f1 as b") .select("a, b"); - DataSet resultSet = tableEnv.toDataSet(result, Row.class); - List results = resultSet.collect(); - String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + - "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; - compareResultAsText(results, expected); +// DataSet resultSet = tableEnv.toDataSet(result, Row.class); +// List results = resultSet.collect(); +// String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + +// "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + +// "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; +// compareResultAsText(results, expected); } - @Test(expected = ExpressionException.class) + @Test(expected = IllegalArgumentException.class) public void testAsWithToFewFields() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -95,13 +95,13 @@ public class SelectITCase extends MultipleProgramsTestBase { Table in = tableEnv.fromDataSet(ds, "a, b"); - DataSet resultSet = tableEnv.toDataSet(in, Row.class); - List results = resultSet.collect(); - String expected = " sorry dude "; - compareResultAsText(results, expected); +// DataSet resultSet = tableEnv.toDataSet(in, Row.class); +// List results = resultSet.collect(); +// String expected = " sorry dude "; +// compareResultAsText(results, expected); } - @Test(expected = ExpressionException.class) + @Test(expected = IllegalArgumentException.class) public void testAsWithToManyFields() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment
[45/50] [abbrv] flink git commit: [FLINK-3226] Implement a CodeGenerator for an efficient translation to DataSet programs
[FLINK-3226] Implement a CodeGenerator for an efficient translation to DataSet programs This closes #1595 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4ad9dd5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4ad9dd5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4ad9dd5 Branch: refs/heads/tableOnCalcite Commit: a4ad9dd566353c578ad660fda7039757a605f27d Parents: 99f60c8 Author: twalthr Authored: Fri Feb 5 17:40:54 2016 +0100 Committer: Fabian Hueske Committed: Fri Feb 12 11:34:09 2016 +0100 -- .../api/java/table/JavaBatchTranslator.scala| 11 +- .../flink/api/java/table/TableEnvironment.scala | 15 +- .../api/scala/table/ScalaBatchTranslator.scala | 6 +- .../api/scala/table/TableConversions.scala | 9 +- .../api/scala/table/TableEnvironment.scala | 75 ++ .../apache/flink/api/table/TableConfig.scala| 30 +- .../apache/flink/api/table/TableException.scala | 23 + .../api/table/codegen/CodeGenException.scala| 24 + .../flink/api/table/codegen/CodeGenUtils.scala | 176 .../flink/api/table/codegen/CodeGenerator.scala | 752 ++ .../table/codegen/ExpressionCodeGenerator.scala | 794 --- .../api/table/codegen/GenerateFilter.scala | 99 --- .../flink/api/table/codegen/GenerateJoin.scala | 171 .../table/codegen/GenerateResultAssembler.scala | 119 --- .../api/table/codegen/GenerateSelect.scala | 84 -- .../api/table/codegen/GeneratedExpression.scala | 27 + .../api/table/codegen/GeneratedFunction.scala | 23 + .../flink/api/table/codegen/Indenter.scala | 10 +- .../api/table/codegen/OperatorCodeGen.scala | 367 + .../flink/api/table/expressions/literals.scala | 6 +- .../flink/api/table/plan/TypeConverter.scala| 83 +- .../plan/nodes/dataset/DataSetExchange.scala| 11 +- .../plan/nodes/dataset/DataSetFlatMap.scala | 24 +- .../plan/nodes/dataset/DataSetGroupReduce.scala | 7 +- .../table/plan/nodes/dataset/DataSetJoin.scala | 7 +- .../table/plan/nodes/dataset/DataSetMap.scala | 26 +- .../plan/nodes/dataset/DataSetReduce.scala | 7 +- .../table/plan/nodes/dataset/DataSetRel.scala | 8 +- .../table/plan/nodes/dataset/DataSetSort.scala | 11 +- .../plan/nodes/dataset/DataSetSource.scala | 190 ++--- .../table/plan/nodes/dataset/DataSetUnion.scala | 7 +- .../plan/rules/dataset/DataSetFilterRule.scala | 50 +- .../plan/rules/dataset/DataSetProjectRule.scala | 38 +- .../runtime/ExpressionAggregateFunction.scala | 100 --- .../runtime/ExpressionFilterFunction.scala | 50 -- .../table/runtime/ExpressionJoinFunction.scala | 57 -- .../runtime/ExpressionSelectFunction.scala | 56 -- .../flink/api/table/runtime/FlatMapRunner.scala | 51 ++ .../api/table/runtime/FunctionCompiler.scala| 35 + .../flink/api/table/runtime/MapRunner.scala | 50 ++ .../flink/api/table/runtime/package.scala | 23 - .../api/table/typeinfo/RenameOperator.scala | 36 - .../table/typeinfo/RenamingProxyTypeInfo.scala | 143 .../flink/api/table/typeinfo/RowTypeInfo.scala | 23 +- .../api/java/table/test/AggregationsITCase.java | 1 - .../flink/api/java/table/test/AsITCase.java | 4 +- .../api/java/table/test/CastingITCase.java | 13 +- .../api/java/table/test/ExpressionsITCase.java | 20 +- .../flink/api/java/table/test/FilterITCase.java | 26 +- .../table/test/GroupedAggregationsITCase.java | 1 - .../flink/api/java/table/test/JoinITCase.java | 1 - .../api/java/table/test/PojoGroupingITCase.java | 4 +- .../flink/api/java/table/test/SelectITCase.java | 22 +- .../table/test/StringExpressionsITCase.java | 7 +- .../flink/api/java/table/test/UnionITCase.java | 1 - .../api/scala/table/test/CastingITCase.scala| 11 +- .../scala/table/test/ExpressionsITCase.scala| 33 +- .../api/scala/table/test/FilterITCase.scala | 59 +- .../api/scala/table/test/SelectITCase.scala | 27 +- .../table/test/StringExpressionsITCase.scala| 6 +- .../api/table/test/TableProgramsTestBase.scala | 97 +++ .../typeinfo/RenamingProxyTypeInfoTest.scala| 75 -- .../api/table/typeinfo/RowComparatorTest.scala | 3 +- .../api/table/typeinfo/RowSerializerTest.scala | 20 +- 64 files changed, 2199 insertions(+), 2146 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scal
[12/50] [abbrv] flink git commit: [FLINK-3366] Rename @Experimental annotation to @PublicEvolving
http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala -- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 736c41b..04c1980 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.annotation.{Experimental, Public} +import org.apache.flink.annotation.{PublicEvolving, Public} import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner} import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.api.common.typeinfo.TypeInformation @@ -50,6 +50,7 @@ class DataStream[T](stream: JavaStream[T]) { /** * Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]]. + * * @return associated execution environment */ def getExecutionEnvironment: StreamExecutionEnvironment = @@ -60,7 +61,7 @@ class DataStream[T](stream: JavaStream[T]) { * * @return ID of the DataStream */ - @Experimental + @PublicEvolving def getId = stream.getId /** @@ -128,7 +129,7 @@ class DataStream[T](stream: JavaStream[T]) { * @param uid The unique user-specified ID of this transformation. * @return The operator with the specified ID. */ - @Experimental + @PublicEvolving def uid(uid: String) : DataStream[T] = javaStream match { case stream : SingleOutputStreamOperator[T,_] => stream.uid(uid) case _ => throw new UnsupportedOperationException("Only supported for operators.") @@ -142,7 +143,7 @@ class DataStream[T](stream: JavaStream[T]) { * however it is not advised for performance considerations. * */ - @Experimental + @PublicEvolving def disableChaining(): DataStream[T] = { stream match { case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining(); @@ -158,7 +159,7 @@ class DataStream[T](stream: JavaStream[T]) { * previous tasks even if possible. * */ - @Experimental + @PublicEvolving def startNewChain(): DataStream[T] = { stream match { case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain(); @@ -175,7 +176,7 @@ class DataStream[T](stream: JavaStream[T]) { * All subsequent operators are assigned to the default resource group. * */ - @Experimental + @PublicEvolving def isolateResources(): DataStream[T] = { stream match { case ds: SingleOutputStreamOperator[_, _] => ds.isolateResources(); @@ -196,7 +197,7 @@ class DataStream[T](stream: JavaStream[T]) { * degree of parallelism for the operators must be decreased from the * default. */ - @Experimental + @PublicEvolving def startNewResourceGroup(): DataStream[T] = { stream match { case ds: SingleOutputStreamOperator[_, _] => ds.startNewResourceGroup(); @@ -345,14 +346,14 @@ class DataStream[T](stream: JavaStream[T]) { * the first instance of the next processing operator. Use this setting with care * since it might cause a serious performance bottleneck in the application. */ - @Experimental + @PublicEvolving def global: DataStream[T] = stream.global() /** * Sets the partitioning of the DataStream so that the output tuples * are shuffled to the next component. */ - @Experimental + @PublicEvolving def shuffle: DataStream[T] = stream.shuffle() /** @@ -385,7 +386,7 @@ class DataStream[T](stream: JavaStream[T]) { * In cases where the different parallelisms are not multiples of each other one or several * downstream operations will have a differing number of inputs from upstream operations. */ - @Experimental + @PublicEvolving def rescale: DataStream[T] = stream.rescale() /** @@ -408,7 +409,7 @@ class DataStream[T](stream: JavaStream[T]) { * the keepPartitioning flag to true * */ - @Experimental + @PublicEvolving def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), maxWaitTimeMillis:Long = 0, keepPartitioning: Boolean = false) : DataStream[R] = { @@ -438,7 +439,7 @@ class DataStream[T](stream: JavaStream[T]) { * to 0 then the iteration sources will indefinitely, so the job must be killed to stop. * */ - @Experimental + @PublicEvolving def iterate[R, F: TypeInformation: ClassTag](stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = { val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]] @@ -625,7 +626,7 @@ class Dat
[38/50] [abbrv] flink git commit: [FLINK-3225] Enforce translation to DataSetNodes
[FLINK-3225] Enforce translation to DataSetNodes Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3cb76fcb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3cb76fcb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3cb76fcb Branch: refs/heads/tableOnCalcite Commit: 3cb76fcbcc997e975ac7f1589c9f65d83dfd0137 Parents: fe5e406 Author: Fabian Hueske Authored: Mon Feb 1 23:45:16 2016 +0100 Committer: Fabian Hueske Committed: Fri Feb 12 11:34:09 2016 +0100 -- .../flink/api/java/table/JavaBatchTranslator.scala | 16 +--- 1 file changed, 9 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3cb76fcb/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala index 66bfbe7..7e91190 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -19,14 +19,14 @@ package org.apache.flink.api.java.table import org.apache.calcite.plan.{RelTraitSet, RelOptUtil} -import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.{RelCollations, RelNode} import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.Programs import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{DataSet => JavaDataSet} import org.apache.flink.api.table.plan._ import org.apache.flink.api.table.Table -import org.apache.flink.api.table.plan.nodes.dataset.DataSetRel +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel} import org.apache.flink.api.table.plan.rules.FlinkRuleSets import org.apache.flink.api.table.plan.schema.DataSetTable @@ -61,20 +61,19 @@ class JavaBatchTranslator extends PlanTranslator { // get the planner for the plan val planner = lPlan.getCluster.getPlanner -// we do not have any special requirements for the output -val outputProps = RelTraitSet.createEmpty() println("---") println("Input Plan:") println("---") println(RelOptUtil.toString(lPlan)) - + // decorrelate val decorPlan = RelDecorrelator.decorrelateQuery(lPlan) // optimize the logical Flink plan val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES) -val optPlan = optProgram.run(planner, decorPlan, outputProps) +val flinkOutputProps = RelTraitSet.createEmpty() +val optPlan = optProgram.run(planner, decorPlan, flinkOutputProps) println("---") println("Optimized Plan:") @@ -83,7 +82,10 @@ class JavaBatchTranslator extends PlanTranslator { // optimize the logical Flink plan val dataSetProgram = Programs.ofRules(FlinkRuleSets.DATASET_TRANS_RULES) -val dataSetPlan = dataSetProgram.run(planner, optPlan, outputProps) +val dataSetOutputProps = RelTraitSet.createEmpty() + .plus(DataSetConvention.INSTANCE) + .plus(RelCollations.of()).simplify() +val dataSetPlan = dataSetProgram.run(planner, optPlan, dataSetOutputProps) println("-") println("DataSet Plan:")
[24/50] [abbrv] flink git commit: [FLINK-3107] [runtime] Start checkpoint ID counter with periodic scheduler
[FLINK-3107] [runtime] Start checkpoint ID counter with periodic scheduler Problem: The job manager enables checkpoints during submission of streaming programs. This can lead to call to a call to `ZooKeeperCheckpointIDCounter.start()`, which communicates with ZooKeeper. This can block the job manager actor. Solution: Start the counter in the `CheckpointCoordinatorDeActivator`. This closes #1610. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8df0bbac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8df0bbac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8df0bbac Branch: refs/heads/tableOnCalcite Commit: 8df0bbacb8712342471c12cbf765a0a92b70abc9 Parents: 6968a57 Author: Ufuk Celebi Authored: Tue Feb 9 16:06:46 2016 +0100 Committer: Ufuk Celebi Committed: Wed Feb 10 19:51:59 2016 +0100 -- .../flink/runtime/checkpoint/CheckpointCoordinator.java | 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8df0bbac/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 9963a20..b0e23d6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -197,9 +197,9 @@ public class CheckpointCoordinator { this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.recentPendingCheckpoints = new ArrayDeque(NUM_GHOST_CHECKPOINT_IDS); this.userClassLoader = userClassLoader; - this.checkpointIdCounter = checkNotNull(checkpointIDCounter); - checkpointIDCounter.start(); + // Started with the periodic scheduler + this.checkpointIdCounter = checkNotNull(checkpointIDCounter); this.timer = new Timer("Checkpoint Timer", true); @@ -862,6 +862,14 @@ public class CheckpointCoordinator { // make sure all prior timers are cancelled stopCheckpointScheduler(); + try { + // Multiple start calls are OK + checkpointIdCounter.start(); + } catch (Exception e) { + String msg = "Failed to start checkpoint ID counter: " + e.getMessage(); + throw new RuntimeException(msg, e); + } + periodicScheduling = true; currentPeriodicTrigger = new ScheduledTrigger(); timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
[32/50] [abbrv] flink git commit: [FLINK-3358] [FLINK-3351] [rocksdb] Add simple constructors and automatic temp path configuration
[FLINK-3358] [FLINK-3351] [rocksdb] Add simple constructors and automatic temp path configuration This adds constructors that only take a backup dir URI and use it to initialize both the RocksDB file backups and the FileSystem state backend for non-partitioned state. Also, the RocksDBStateBackend now automatically picks up the TaskManager's temp directories, if no local storage directories are explicitly configured. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edae7934 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edae7934 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edae7934 Branch: refs/heads/tableOnCalcite Commit: edae79340dd486915d25109cbdc1485accae665a Parents: be72758 Author: Stephan Ewen Authored: Thu Feb 11 21:30:36 2016 +0100 Committer: Stephan Ewen Committed: Thu Feb 11 21:34:03 2016 +0100 -- .../streaming/state/RocksDBStateBackend.java| 241 ++-- .../state/RocksDBStateBackendConfigTest.java| 280 +++ .../state/RocksDBStateBackendTest.java | 4 +- .../state/filesystem/FsStateBackend.java| 99 --- .../EventTimeWindowCheckpointingITCase.java | 13 +- 5 files changed, 566 insertions(+), 71 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/edae7934/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index eddd8c0..5b16e86 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -18,7 +18,12 @@ package org.apache.flink.contrib.streaming.state; import java.io.File; +import java.io.IOException; import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; @@ -28,13 +33,17 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.api.common.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.rocksdb.Options; import org.rocksdb.StringAppendOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; @@ -54,20 +63,35 @@ import static java.util.Objects.requireNonNull; public class RocksDBStateBackend extends AbstractStateBackend { private static final long serialVersionUID = 1L; - /** Base path for RocksDB directory. */ - private final String dbBasePath; - - /** The checkpoint directory that we snapshot RocksDB backups to. */ - private final String checkpointDirectory; + private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class); + + + /** The checkpoint directory that copy the RocksDB backups to. */ + private final Path checkpointDirectory; + /** The state backend that stores the non-partitioned state */ + private final AbstractStateBackend nonPartitionedStateBackend; + + /** Operator identifier that is used to uniqueify the RocksDB storage path. */ private String operatorIdentifier; /** JobID for uniquifying backup paths. */ private JobID jobId; + - private AbstractStateBackend backingStateBackend; + // DB storage directories + + /** Base paths for RocksDB directory, as configured. May be null. */ + private Path[] dbBasePaths; + /** Base paths for RocksDB directory, as initialized */ + private File[] dbStorageDirectories; + + private int nextDirectory; + + // RocksDB options + /** The pre-configured option settings */ private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT; @@ -79,31 +103,112 @@ public class RocksDBS
[26/50] [abbrv] flink git commit: [FLINK-3384] [kafka] Add ClosableQueue for message exchanges between Kafka Threads
[FLINK-3384] [kafka] Add ClosableQueue for message exchanges between Kafka Threads Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd324ea7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd324ea7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd324ea7 Branch: refs/heads/tableOnCalcite Commit: fd324ea72979cc3d4202ffa3ea174ec4cc9d153b Parents: 50bd65a Author: Stephan Ewen Authored: Wed Feb 10 14:51:10 2016 +0100 Committer: Stephan Ewen Committed: Wed Feb 10 22:15:32 2016 +0100 -- .../kafka/internals/ClosableBlockingQueue.java | 502 +++ .../internals/ClosableBlockingQueueTest.java| 603 +++ 2 files changed, 1105 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/fd324ea7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java new file mode 100644 index 000..856c2ad --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static java.util.Objects.requireNonNull; + +/** + * A special form of blocking queue with two additions: + * + * The queue can be closed atomically when empty. Adding elements after the queue + * is closed fails. This allows queue consumers to atomically discover that no elements + * are available and mark themselves as shut down. + * The queue allows to poll batches of elements in one polling call. + * + * + * The queue has no capacity restriction and is safe for multiple producers and consumers. + * + * Note: Null elements are prohibited. + * + * @param The type of elements in the queue. + */ +public class ClosableBlockingQueue { + + /** The lock used to make queue accesses and open checks atomic */ + private final ReentrantLock lock; + + /** The condition on which blocking get-calls wait if the queue is empty */ + private final Condition nonEmpty; + + /** The deque of elements */ + private final ArrayDeque elements; + + /** Flag marking the status of the queue */ + private volatile boolean open; + + // + + /** +* Creates a new empty queue. +*/ + public ClosableBlockingQueue() { + this(10); + } + + /** +* Creates a new empty queue, reserving space for at least the specified number +* of elements. The queu can still grow, of more elements are added than the +* reserved space. +* +* @param initialSize The number of elements to reserve space for. +*/ + public ClosableBlockingQueue(int initialSize) { + this.lock = new ReentrantLock(true); + this.nonEmpty = this.lock.newCondition(); + + this.elements = new ArrayDeque<>(initialSize); + this.open = true; + + + } + + /** +* Creates a new queue that contains the given eleme
[04/50] [abbrv] flink git commit: [hotfix] Update comments in 'ChainingStrategy' and remove outdated 'FORCE_ALWAYS' constant
[hotfix] Update comments in 'ChainingStrategy' and remove outdated 'FORCE_ALWAYS' constant Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9c83ea2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9c83ea2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9c83ea2 Branch: refs/heads/tableOnCalcite Commit: e9c83ea2c36decf02d0ea9c2b76b0fd50606b51b Parents: 28feede Author: Stephan Ewen Authored: Mon Feb 8 15:08:09 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 8 20:19:25 2016 +0100 -- .../api/graph/StreamingJobGraphGenerator.java | 9 ++ .../api/operators/ChainingStrategy.java | 30 +++- 2 files changed, 19 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e9c83ea2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index fd75ba7..c0d2856 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -398,15 +398,12 @@ public class StreamingJobGraphGenerator { && headOperator != null && upStreamVertex.getSlotSharingID() == downStreamVertex.getSlotSharingID() && upStreamVertex.getSlotSharingID() != -1 - && (outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS || - outOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS) + && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || - headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS || - headOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS) + headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) && (edge.getPartitioner() instanceof ForwardPartitioner) && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() - && (streamGraph.isChainingEnabled() || - outOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS); + && streamGraph.isChainingEnabled(); } private void setSlotSharing() { http://git-wip-us.apache.org/repos/asf/flink/blob/e9c83ea2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java index 18e8858..1bf3259 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java @@ -18,23 +18,22 @@ package org.apache.flink.streaming.api.operators; - /** - * Defines the chaining scheme for the operator. - * By default {@link #ALWAYS} is used, which means operators will be eagerly chained whenever possible. + * Defines the chaining scheme for the operator. When an operator is chained to the + * predecessor, it means that they run in the same thread. They become one operator + * consisting of multiple steps. + * + * The default value used by the {@link StreamOperator} is {@link #HEAD}, which means that + * the operator is not chained to its predecessor. Most operators override this with + * {@link #ALWAYS}, meaning they will be chained to predecessors whenever possible. */ public enum ChainingStrategy { - /** -* Chaining will happen even if chaining is disabled on the execution environment. -* This should only be used by system-level operators, not operators implemented by users. -*/ - FORCE_ALWAYS, - /** -* Operators will be eagerly chained whenever possible, for -* maximal performance. It is generally a good practice to allow maximal -
[10/50] [abbrv] flink git commit: [FLINK-3373] [build] Revert HTTP Components versions because of incompatibility with Hadoop >= 2.6.0
[FLINK-3373] [build] Revert HTTP Components versions because of incompatibility with Hadoop >= 2.6.0 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de231d74 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de231d74 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de231d74 Branch: refs/heads/tableOnCalcite Commit: de231d74650600ae84a76ee267aeab6ddd0ff596 Parents: 72f8228 Author: Stephan Ewen Authored: Tue Feb 9 20:38:43 2016 +0100 Committer: Stephan Ewen Committed: Tue Feb 9 20:38:43 2016 +0100 -- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/de231d74/pom.xml -- diff --git a/pom.xml b/pom.xml index 81528c8..0a64c4a 100644 --- a/pom.xml +++ b/pom.xml @@ -344,13 +344,13 @@ under the License. org.apache.httpcomponents httpcore - 4.4.4 + 4.2.5 org.apache.httpcomponents httpclient - 4.5.1 + 4.2.6
[30/50] [abbrv] flink git commit: [FLINK-3271] [build] Don't exclude jetty-util dependency from the Hadoop dependencies
[FLINK-3271] [build] Don't exclude jetty-util dependency from the Hadoop dependencies This closes #1543 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82c7383a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82c7383a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82c7383a Branch: refs/heads/tableOnCalcite Commit: 82c7383a67a8349d05dc98780667b3a47ab3cc54 Parents: dcea46e Author: Abhishek Agarwal Authored: Sun Jan 24 20:22:45 2016 +0530 Committer: Stephan Ewen Committed: Thu Feb 11 21:34:03 2016 +0100 -- .../flink-shaded-hadoop1/pom.xml| 12 -- .../flink-shaded-hadoop2/pom.xml| 41 +--- 2 files changed, 1 insertion(+), 52 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/82c7383a/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml -- diff --git a/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml b/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml index 15082aa..e8634f5 100644 --- a/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml +++ b/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml @@ -69,10 +69,6 @@ under the License. jsp-2.1 - org.mortbay.jetty - jetty-util - - org.eclipse.jdt core @@ -81,14 +77,6 @@ under the License. servlet-api - org.mortbay.jetty - jetty - - - org.mortbay.jetty - jetty-util - - com.sun.jersey jersey-core http://git-wip-us.apache.org/repos/asf/flink/blob/82c7383a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml -- diff --git a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml index 5eb8043..c642653 100644 --- a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml +++ b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml @@ -68,10 +68,7 @@ under the License. org.mortbay.jetty jsp-2.1 - - org.mortbay.jetty - jetty-util - + org.eclipse.jdt core @@ -81,10 +78,6 @@ under the License. jetty - org.mortbay.jetty - jetty-util - - com.sun.jersey jersey-json @@ -185,10 +178,6 @@ under the License. jsp-2.1 - org.mortbay.jetty - jetty-util - - org.eclipse.jdt core @@ -197,10 +186,6 @@ under the License. jetty - org.mortbay.jetty - jetty-util - - com.sun.jersey jersey-json @@ -301,10 +286,6 @@ under the License. jsp-2.1 - org.mortbay.jetty -
[22/50] [abbrv] flink git commit: [hotfix] [tests] Reset state to allow retry on failure
[hotfix] [tests] Reset state to allow retry on failure This closes #1611 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48b74546 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48b74546 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48b74546 Branch: refs/heads/tableOnCalcite Commit: 48b745466202ebbb68608930e13cb6ed4a35e6e7 Parents: 756cbaf Author: Ufuk Celebi Authored: Tue Feb 9 12:45:41 2016 +0100 Committer: Stephan Ewen Committed: Wed Feb 10 15:27:41 2016 +0100 -- .../JobManagerCheckpointRecoveryITCase.java | 19 --- 1 file changed, 12 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/48b74546/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java index 59a05ff..ea30c58 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java @@ -116,15 +116,15 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger { private static final int Parallelism = 8; - private static final CountDownLatch CompletedCheckpointsLatch = new CountDownLatch(2); + private static CountDownLatch CompletedCheckpointsLatch = new CountDownLatch(2); - private static final AtomicLongArray RecoveredStates = new AtomicLongArray(Parallelism); + private static AtomicLongArray RecoveredStates = new AtomicLongArray(Parallelism); - private static final CountDownLatch FinalCountLatch = new CountDownLatch(1); + private static CountDownLatch FinalCountLatch = new CountDownLatch(1); - private static final AtomicReference FinalCount = new AtomicReference<>(); + private static AtomicReference FinalCount = new AtomicReference<>(); - private static final long LastElement = -1; + private static long LastElement = -1; /** * Simple checkpointed streaming sum. @@ -156,7 +156,6 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger { Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper .getConnectString(), FileStateBackendBasePath.getAbsoluteFile().toURI().toString()); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); ActorSystem testSystem = null; JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2]; @@ -248,6 +247,13 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger { } } catch (Throwable t) { + // Reset all static state for test retries + CompletedCheckpointsLatch = new CountDownLatch(2); + RecoveredStates = new AtomicLongArray(Parallelism); + FinalCountLatch = new CountDownLatch(1); + FinalCount = new AtomicReference<>(); + LastElement = -1; + // Print early (in some situations the process logs get too big // for Travis and the root problem is not shown) t.printStackTrace(); @@ -303,7 +309,6 @@ public class JobManagerCheckpointRecoveryITCase extends TestLogger { fileStateBackendPath); config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2); - config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s"); JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2]; LeaderRetrievalService leaderRetrievalService = null;
[41/50] [abbrv] flink git commit: [FLINK-3225] Implemented optimization of Table API queries via Calcite
http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala deleted file mode 100644 index 65b97fb..000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.operators - -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.calcite.schema.impl.AbstractTable -import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.plan.TypeConverter - -class DataSetTable[T]( - val dataSet: DataSet[T], - val fieldNames: Array[String]) extends AbstractTable { - - // check uniquenss of field names - if (fieldNames.length != fieldNames.toSet.size) { -throw new scala.IllegalArgumentException( - "Table field names must be unique.") - } - - val dataSetType: CompositeType[T] = -dataSet.getType match { - case cType: CompositeType[T] => -cType - case _ => -throw new scala.IllegalArgumentException( - "DataSet must have a composite type.") -} - - val fieldTypes: Array[SqlTypeName] = -if (fieldNames.length == dataSetType.getArity) { - (0 until dataSetType.getArity) -.map(i => dataSetType.getTypeAt(i)) -.map(TypeConverter.typeInfoToSqlType) -.toArray -} -else { - throw new IllegalArgumentException( -"Arity of DataSet type not equal to number of field names.") -} - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { -val builder = typeFactory.builder -fieldNames.zip(fieldTypes) - .foreach( f => builder.add(f._1, f._2).nullable(true) ) -builder.build - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala new file mode 100644 index 000..97e8b32 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules + +import org.apache.calcite.rel.rules._ +import org.apache.calcite.tools.{RuleSets, RuleSet} +import org.apache.flink.api.table.plan.rules.logical._ +import org.apache.flink.api.table.plan.rules.dataset._ + +object FlinkRuleSets { + + /** +* RuleSet to optimize plans for batch / DataSet exeuction +*/ + val DATASET_OPT_RULES: RuleSet = RuleSets.ofList( + +// filter rules +FilterJoinRule.FILTER_ON_JOIN, +FilterJoinRule.JOIN, +Fil
[48/50] [abbrv] flink git commit: [FLINK-3282] add FlinkRelNode interface.
[FLINK-3282] add FlinkRelNode interface. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20235e0a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20235e0a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20235e0a Branch: refs/heads/tableOnCalcite Commit: 20235e0afaf5de799e71094d72ae7e47337ea82d Parents: 4abca1d Author: chengxiang li Authored: Wed Jan 27 11:37:43 2016 +0800 Committer: Fabian Hueske Committed: Fri Feb 12 11:34:09 2016 +0100 -- .../api/table/sql/calcite/DataSetRelNode.java | 29 +++ .../table/sql/calcite/node/DataSetExchange.java | 60 +++ .../table/sql/calcite/node/DataSetFlatMap.java | 56 ++ .../api/table/sql/calcite/node/DataSetJoin.java | 80 .../api/table/sql/calcite/node/DataSetMap.java | 58 ++ .../table/sql/calcite/node/DataSetReduce.java | 58 ++ .../sql/calcite/node/DataSetReduceGroup.java| 62 +++ .../api/table/sql/calcite/node/DataSetSort.java | 59 +++ .../table/sql/calcite/node/DataSetSource.java | 55 ++ .../table/sql/calcite/node/DataSetUnion.java| 51 + 10 files changed, 568 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/20235e0a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java -- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java new file mode 100644 index 000..df0ebc0 --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.sql.calcite; + +import org.apache.calcite.rel.RelNode; +import org.apache.flink.api.java.DataSet; + +public interface DataSetRelNode extends RelNode { + + /** +* Translate the FlinkRelNode into Flink operator. +*/ + DataSet translateToPlan(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/20235e0a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java -- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java new file mode 100644 index 000..1ddd884 --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.sql.calcite.node; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod; +import org.apa
[05/50] [abbrv] flink git commit: [hotfix] Cleanup routing of records in OperatorChain
[hotfix] Cleanup routing of records in OperatorChain Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28c6254e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28c6254e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28c6254e Branch: refs/heads/tableOnCalcite Commit: 28c6254ee385fe746e868a81b2207bf66b552174 Parents: e9c83ea Author: Stephan Ewen Authored: Mon Feb 8 16:14:00 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 8 20:36:35 2016 +0100 -- .../BroadcastOutputSelectorWrapper.java | 45 --- .../api/collector/selector/DirectedOutput.java | 130 +++ .../selector/DirectedOutputSelectorWrapper.java | 97 -- .../selector/OutputSelectorWrapper.java | 9 +- .../selector/OutputSelectorWrapperFactory.java | 33 - .../flink/streaming/api/graph/StreamConfig.java | 20 +-- .../flink/streaming/api/graph/StreamNode.java | 10 +- .../api/graph/StreamingJobGraphGenerator.java | 2 +- .../streaming/runtime/io/CollectorWrapper.java | 61 - .../streaming/runtime/tasks/OperatorChain.java | 84 ++-- .../flink/streaming/api/OutputSplitterTest.java | 2 +- .../runtime/tasks/StreamTaskTestHarness.java| 9 +- 12 files changed, 225 insertions(+), 277 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java deleted file mode 100644 index 7034b11..000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector.selector; - -import java.util.ArrayList; - -import org.apache.flink.streaming.api.graph.StreamEdge; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.Collector; - -public class BroadcastOutputSelectorWrapper implements OutputSelectorWrapper { - - private static final long serialVersionUID = 1L; - - private final ArrayList>> outputs; - - public BroadcastOutputSelectorWrapper() { - outputs = new ArrayList>>(); - } - - @Override - public void addCollector(Collector> output, StreamEdge edge) { - outputs.add(output); - } - - @Override - public Iterable>> getSelectedOutputs(OUT record) { - return outputs; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java new file mode 100644 index 000..52c50b3 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 +
[02/50] [abbrv] flink git commit: [hotfix] typo IllegalArgumentException
[hotfix] typo IllegalArgumentException This closes #1602 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c47f385 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c47f385 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c47f385 Branch: refs/heads/tableOnCalcite Commit: 5c47f3854938fc789ac0fd0867451f4def155c90 Parents: d51bec1 Author: Andrea Sella Authored: Mon Feb 8 16:38:38 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 8 20:18:19 2016 +0100 -- .../java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5c47f385/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java -- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java index 84eb309..132edc4 100644 --- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java +++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java @@ -357,10 +357,10 @@ public class JDBCInputFormat extends RichInputFormat
[49/50] [abbrv] flink git commit: [FLINK-3226] Add DataSet scan and conversion to DataSet[Row]
[FLINK-3226] Add DataSet scan and conversion to DataSet[Row] This closes #1579. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99f60c84 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99f60c84 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99f60c84 Branch: refs/heads/tableOnCalcite Commit: 99f60c84092a5c656591d68f4d4450f14f88b9ba Parents: 3cb76fc Author: Fabian Hueske Authored: Tue Feb 2 17:15:28 2016 +0100 Committer: Fabian Hueske Committed: Fri Feb 12 11:34:09 2016 +0100 -- .../api/java/table/JavaBatchTranslator.scala| 2 + .../api/scala/table/ScalaBatchTranslator.scala | 8 +- .../flink/api/table/plan/PlanTranslator.scala | 87 --- .../flink/api/table/plan/TypeConverter.scala| 15 ++ .../plan/nodes/dataset/DataSetSource.scala | 150 ++- .../plan/rules/dataset/DataSetScanRule.scala| 8 +- .../api/table/plan/schema/DataSetTable.scala| 41 ++--- .../flink/api/java/table/test/AsITCase.java | 63 ++-- .../flink/api/java/table/test/FilterITCase.java | 15 +- .../flink/api/java/table/test/SelectITCase.java | 15 +- .../flink/api/scala/table/test/AsITCase.scala | 40 - .../api/scala/table/test/SelectITCase.scala | 28 ++-- 12 files changed, 380 insertions(+), 92 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala index 7e91190..f70f477 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -40,11 +40,13 @@ class JavaBatchTranslator extends PlanTranslator { override def createTable[A]( repr: Representation[A], + fieldIndexes: Array[Int], fieldNames: Array[String]): Table = { // create table representation from DataSet val dataSetTable = new DataSetTable[A]( repr.asInstanceOf[JavaDataSet[A]], + fieldIndexes, fieldNames ) http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala index 1c453fa..cc92c37 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala @@ -38,8 +38,12 @@ class ScalaBatchTranslator extends PlanTranslator { type Representation[A] = DataSet[A] - override def createTable[A](repr: Representation[A], fieldNames: Array[String]): Table = { -javaTranslator.createTable(repr.javaSet, fieldNames) + override def createTable[A]( +repr: Representation[A], +fieldIndexes: Array[Int], +fieldNames: Array[String]): Table = + { +javaTranslator.createTable(repr.javaSet, fieldIndexes, fieldNames) } override def translate[O](op: RelNode)(implicit tpe: TypeInformation[O]): DataSet[O] = { http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala index 4e97f83..af22768 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala @@ -18,10 +18,12 @@ package org.apache.flink.api.table.plan import org.apache.calcite.rel.RelNode -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
[17/50] [abbrv] flink git commit: [FLINK-3373] [build] Shade away Hadoop's HTTP Components dependency
[FLINK-3373] [build] Shade away Hadoop's HTTP Components dependency This closes #1615 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ccd7544 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ccd7544 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ccd7544 Branch: refs/heads/tableOnCalcite Commit: 8ccd7544edb25e82cc8a898809cc7c8bb7893620 Parents: aeee6ef Author: Stephan Ewen Authored: Tue Feb 9 21:18:43 2016 +0100 Committer: Stephan Ewen Committed: Wed Feb 10 15:01:22 2016 +0100 -- flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml | 15 +++ flink-shaded-hadoop/pom.xml | 13 + pom.xml | 12 3 files changed, 28 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8ccd7544/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml -- diff --git a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml index b5839d9..5eb8043 100644 --- a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml +++ b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml @@ -652,4 +652,19 @@ under the License. + + + + org.apache.httpcomponents + httpcore + 4.2.5 + + + + org.apache.httpcomponents + httpclient + 4.2.6 + + + http://git-wip-us.apache.org/repos/asf/flink/blob/8ccd7544/flink-shaded-hadoop/pom.xml -- diff --git a/flink-shaded-hadoop/pom.xml b/flink-shaded-hadoop/pom.xml index 7d54ef9..d5a8529 100644 --- a/flink-shaded-hadoop/pom.xml +++ b/flink-shaded-hadoop/pom.xml @@ -111,6 +111,11 @@ under the License. io.netty:netty:* org.apache.curator:* org.apache.hadoop:* + + + net.java.dev.jets3t:jets3t + org.apache.httpcomponents:* + commons-httpclient:commons-httpclient @@ -133,6 +138,14 @@ under the License. org.apache.curator org.apache.flink.hadoop.shaded.org.apache.curator + + org.apache.http + org.apache.flink.hadoop.shaded.org.apache.http + + + org.apache.commons.httpclient + org.apache.flink.hadoop.shaded.org.apache.commons.httpclient + http://git-wip-us.apache.org/repos/asf/flink/blob/8ccd7544/pom.xml -- diff --git a/pom.xml b/pom.xml index 0a64c4a..42ebf79 100644 --- a/pom.xml +++ b/pom.xml @@ -342,18 +342,6 @@ under the License. - org.apache.httpcomponents - httpcore - 4.2.5 - - - - org.apache.httpcomponents - httpclient - 4.2.6 - - -
[47/50] [abbrv] flink git commit: [FLINK-3226] implement GroupReduce translation; enable tests for supported operations
[FLINK-3226] implement GroupReduce translation; enable tests for supported operations Squashes the following commits: - Compute average as sum and count for byte, short and int type to avoid rounding errors - Move aggregation functions to org.apache.flink.table.runtime - Remove join-related changes - Change integer average aggregations to maintain sum and count - Long average uses a BigInteger sum Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/18e7f2f4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/18e7f2f4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/18e7f2f4 Branch: refs/heads/tableOnCalcite Commit: 18e7f2f4e0654ad2f00dcf7d1351846345accb13 Parents: 6dd2d77 Author: vasia Authored: Thu Feb 4 15:53:52 2016 +0100 Committer: Fabian Hueske Committed: Fri Feb 12 11:34:09 2016 +0100 -- .../flink/api/table/plan/TypeConverter.scala| 2 +- .../plan/functions/AggregateFunction.scala | 71 - .../table/plan/functions/FunctionUtils.scala| 37 - .../plan/functions/aggregate/Aggregate.scala| 42 -- .../functions/aggregate/AggregateFactory.scala | 135 - .../plan/functions/aggregate/AvgAggregate.scala | 148 --- .../functions/aggregate/CountAggregate.scala| 34 - .../plan/functions/aggregate/MaxAggregate.scala | 136 - .../plan/functions/aggregate/MinAggregate.scala | 136 - .../plan/functions/aggregate/SumAggregate.scala | 130 .../plan/nodes/dataset/DataSetGroupReduce.scala | 30 +++- .../table/plan/nodes/dataset/DataSetJoin.scala | 6 +- .../plan/nodes/logical/FlinkAggregate.scala | 16 -- .../api/table/plan/rules/FlinkRuleSets.scala| 3 +- .../rules/dataset/DataSetAggregateRule.scala| 13 +- .../plan/rules/dataset/DataSetJoinRule.scala| 102 + .../api/table/runtime/AggregateFunction.scala | 76 ++ .../api/table/runtime/aggregate/Aggregate.scala | 42 ++ .../runtime/aggregate/AggregateFactory.scala| 136 + .../table/runtime/aggregate/AvgAggregate.scala | 131 .../runtime/aggregate/CountAggregate.scala | 34 + .../table/runtime/aggregate/MaxAggregate.scala | 84 +++ .../table/runtime/aggregate/MinAggregate.scala | 86 +++ .../table/runtime/aggregate/SumAggregate.scala | 50 +++ .../api/java/table/test/AggregationsITCase.java | 13 +- .../api/java/table/test/ExpressionsITCase.java | 2 - .../flink/api/java/table/test/FilterITCase.java | 2 - .../table/test/GroupedAggregationsITCase.java | 6 +- .../flink/api/java/table/test/SelectITCase.java | 2 - .../flink/api/java/table/test/UnionITCase.java | 1 - .../scala/table/test/AggregationsITCase.scala | 11 +- .../scala/table/test/ExpressionsITCase.scala| 1 - .../table/test/GroupedAggregationsITCase.scala | 6 +- 33 files changed, 704 insertions(+), 1020 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala index b7cb200..1fc482a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala @@ -135,7 +135,7 @@ object TypeConverter { logicalFieldTypes.head } else { -new TupleTypeInfo[Any](logicalFieldTypes.toArray:_*) +new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala deleted file mode 100644 index 4abf2d2..000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright o
[29/50] [abbrv] flink git commit: [FLINK-3364] [runtime, yarn] Move SavepointStore initialization out of JobManager constructor
[FLINK-3364] [runtime, yarn] Move SavepointStore initialization out of JobManager constructor This closes #1622. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcea46e8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcea46e8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcea46e8 Branch: refs/heads/tableOnCalcite Commit: dcea46e891a1479205fdfe939858d340cde87d57 Parents: ed7d3da Author: Ufuk Celebi Authored: Thu Feb 11 14:58:35 2016 +0100 Committer: Ufuk Celebi Committed: Thu Feb 11 20:49:28 2016 +0100 -- .../flink/runtime/jobmanager/JobManager.scala | 22 +++- .../JobManagerLeaderElectionTest.java | 6 +- .../runtime/testingUtils/TestingCluster.scala | 6 -- .../testingUtils/TestingJobManager.scala| 8 --- .../runtime/testingUtils/TestingUtils.scala | 3 ++- .../flink/yarn/TestingYarnJobManager.scala | 8 --- .../org/apache/flink/yarn/YarnJobManager.scala | 8 --- 7 files changed, 38 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala -- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index d96575f..78612c0 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -113,7 +113,8 @@ class JobManager( protected val timeout: FiniteDuration, protected val leaderElectionService: LeaderElectionService, protected val submittedJobGraphs : SubmittedJobGraphStore, -protected val checkpointRecoveryFactory : CheckpointRecoveryFactory) +protected val checkpointRecoveryFactory : CheckpointRecoveryFactory, +protected val savepointStore: SavepointStore) extends FlinkActor with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging with LogMessages // mixin order is important, we want first logging @@ -151,9 +152,6 @@ class JobManager( val webMonitorPort : Int = flinkConfiguration.getInteger( ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1) - protected val savepointStore : SavepointStore = -SavepointStoreFactory.createFromConfig(flinkConfiguration) - /** * Run when the job manager is started. Simply logs an informational message. * The method also starts the leader election service. @@ -2040,7 +2038,8 @@ object JobManager { Int, // number of archived jobs LeaderElectionService, SubmittedJobGraphStore, -CheckpointRecoveryFactory) = { +CheckpointRecoveryFactory, +SavepointStore) = { val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) @@ -2078,8 +2077,6 @@ object JobManager { } } - - var blobServer: BlobServer = null var instanceManager: InstanceManager = null var scheduler: FlinkScheduler = null @@ -2140,6 +2137,8 @@ object JobManager { new ZooKeeperCheckpointRecoveryFactory(client, configuration)) } +val savepointStore = SavepointStoreFactory.createFromConfig(configuration) + (executorService, instanceManager, scheduler, @@ -2150,7 +2149,8 @@ object JobManager { archiveCount, leaderElectionService, submittedJobGraphs, - checkpointRecoveryFactory) + checkpointRecoveryFactory, + savepointStore) } /** @@ -2212,7 +2212,8 @@ object JobManager { archiveCount, leaderElectionService, submittedJobGraphs, -checkpointRecoveryFactory) = createJobManagerComponents( +checkpointRecoveryFactory, +savepointStore) = createJobManagerComponents( configuration, None) @@ -2237,7 +2238,8 @@ object JobManager { timeout, leaderElectionService, submittedJobGraphs, - checkpointRecoveryFactory) + checkpointRecoveryFactory, + savepointStore) val jobManager: ActorRef = jobMangerActorName match { case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName) http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index f50a0a0..73c7646 100644 ---
[21/50] [abbrv] flink git commit: [hotfix] [tests] Log retry rule failures on warn level
[hotfix] [tests] Log retry rule failures on warn level Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/756cbaff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/756cbaff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/756cbaff Branch: refs/heads/tableOnCalcite Commit: 756cbafff1fd25f67268ca84b62c8a479156bf88 Parents: 3a643c0 Author: Ufuk Celebi Authored: Tue Feb 9 11:25:37 2016 +0100 Committer: Stephan Ewen Committed: Wed Feb 10 15:27:28 2016 +0100 -- .../test/java/org/apache/flink/testutils/junit/RetryRule.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/756cbaff/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java -- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java b/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java index a4aff86..2b3a37a 100644 --- a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java +++ b/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java @@ -113,7 +113,7 @@ public class RetryRule implements TestRule { break; // success } catch (Throwable t) { - LOG.debug(String.format("Test run failed (%d/%d).", + LOG.warn(String.format("Test run failed (%d/%d).", currentRun, timesOnFailure + 1), t); // Throw the failure if retried too often @@ -156,7 +156,7 @@ public class RetryRule implements TestRule { break; // success } catch (Throwable t) { - LOG.debug(String.format("Test run failed (%d/%d).", currentRun, timesOnFailure + 1), t); + LOG.warn(String.format("Test run failed (%d/%d).", currentRun, timesOnFailure + 1), t); if (!exceptionClass.isAssignableFrom(t.getClass()) || currentRun >= timesOnFailure) { // Throw the failure if retried too often, or if it is the wrong exception
[44/50] [abbrv] flink git commit: [FLINK-3226] Implement a CodeGenerator for an efficient translation to DataSet programs
http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala deleted file mode 100644 index b706e6d..000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.codegen - -import java.io.StringReader - -import org.apache.flink.api.common.functions.FlatJoinFunction -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.codegen.Indenter._ -import org.apache.flink.api.table.expressions.{Expression, NopExpression} -import org.slf4j.LoggerFactory - -/** - * Code generator for assembling the result of a binary operation. - */ -class GenerateJoin[L, R, O]( -leftTypeInfo: CompositeType[L], -rightTypeInfo: CompositeType[R], -resultTypeInfo: CompositeType[O], -predicate: Expression, -outputFields: Seq[Expression], -cl: ClassLoader, -config: TableConfig) - extends GenerateResultAssembler[FlatJoinFunction[L, R, O]]( -Seq(("in0", leftTypeInfo), ("in1", rightTypeInfo)), -cl = cl, -config) { - - val LOG = LoggerFactory.getLogger(this.getClass) - - - override protected def generateInternal(): FlatJoinFunction[L, R, O] = { - -val leftTpe = typeTermForTypeInfo(leftTypeInfo) -val rightTpe = typeTermForTypeInfo(rightTypeInfo) -val resultTpe = typeTermForTypeInfo(resultTypeInfo) - - -val resultCode = createResult(resultTypeInfo, outputFields, o => s"coll.collect($o);") - -val generatedName = freshName("GeneratedJoin") - - -val code = predicate match { - case n: NopExpression => -// Janino does not support generics, that's why we need -// manual casting here -if (nullCheck) { - j""" -public class $generatedName -implements org.apache.flink.api.common.functions.FlatFlatJoinFunction { - - ${reuseCode(resultTypeInfo)} - - public org.apache.flink.api.table.TableConfig config = null; - public $generatedName(org.apache.flink.api.table.TableConfig config) { -this.config = config; - } - - public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { -$leftTpe in0 = ($leftTpe) _in0; -$rightTpe in1 = ($rightTpe) _in1; - -$resultCode - } -} - """ -} else { - j""" -public class $generatedName -implements org.apache.flink.api.common.functions.FlatJoinFunction { - - ${reuseCode(resultTypeInfo)} - - public org.apache.flink.api.table.TableConfig config = null; - public $generatedName(org.apache.flink.api.table.TableConfig config) { -this.config = config; - } - - public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { -$leftTpe in0 = ($leftTpe) _in0; -$rightTpe in1 = ($rightTpe) _in1; - -$resultCode - } -} - """ -} - - case _ => -val pred = generateExpression(predicate) -// Janino does not support generics, that's why we need -// manual casting here -if (nullCheck) { - j""" -public class $generatedName -implements org.apache.flink.api.common.functions.FlatFlatJoinFunction { - - ${reuseCode(resultTypeInfo)} - - org.apache.flink.api.table.TableConfig config = null; - - public $generatedName(org.apache.flink.api.table.TableConfig config) { -this.config = config; -${reuseInitCode()} - } - - public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { -$l
[28/50] [abbrv] flink git commit: [FLINK-3391] [tests] Fix typo in test config
[FLINK-3391] [tests] Fix typo in test config Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed7d3da7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed7d3da7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed7d3da7 Branch: refs/heads/tableOnCalcite Commit: ed7d3da7f93769a858f6d4d92c41d2aef69be013 Parents: cf3ae88 Author: Ufuk Celebi Authored: Thu Feb 11 16:13:26 2016 +0100 Committer: Ufuk Celebi Committed: Thu Feb 11 16:13:26 2016 +0100 -- .../java/org/apache/flink/test/checkpointing/SavepointITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d3da7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 46c0453..58f8225 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -441,7 +441,7 @@ public class SavepointITCase extends TestLogger { LOG.info("Created temporary savepoint directory: " + savepointDir + "."); config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); - config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "fileystem"); + config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
[06/50] [abbrv] flink git commit: [FLINK-3355] [rocksdb backend] Allow passing options to the RocksDB backend.
[FLINK-3355] [rocksdb backend] Allow passing options to the RocksDB backend. This also cleans up the generics in the RocksDB state classes. This closes #1608 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ee16794 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ee16794 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ee16794 Branch: refs/heads/tableOnCalcite Commit: 9ee16794909d18aa84e8d0b738a6a447d11e6eeb Parents: 28c6254 Author: Stephan Ewen Authored: Mon Feb 8 19:55:29 2016 +0100 Committer: Stephan Ewen Committed: Tue Feb 9 11:03:09 2016 +0100 -- .../streaming/state/AbstractRocksDBState.java | 113 +-- .../contrib/streaming/state/OptionsFactory.java | 31 + .../streaming/state/RocksDBListState.java | 68 ++- .../streaming/state/RocksDBReducingState.java | 86 +++--- .../streaming/state/RocksDBStateBackend.java| 76 +++-- .../streaming/state/RocksDBValueState.java | 74 ++-- 6 files changed, 273 insertions(+), 175 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 783332c..05e15e8 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -1,36 +1,38 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.contrib.streaming.state; import org.apache.commons.io.FileUtils; + import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.KvStateSnapshot; import org.apache.flink.util.HDFSCopyFromLocal; import org.apache.flink.util.HDFSCopyToLocal; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + import org.rocksdb.BackupEngine; import org.rocksdb.BackupableDBOptions; import org.rocksdb.Env; @@ -38,7 +40,7 @@ import org.rocksdb.Options; import org.rocksdb.RestoreOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -import org.rocksdb.StringAppendOperator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,10 +62,9 @@ import static java.util.Objects.requireNonNull; * @param The type of the namespace. * @param The type of {@link State}. * @param The type of {@link StateDescriptor}. - * @param The type of the backend that snapshots this key/value state. */ -public abstract class AbstractRocksDBState, Backend extends AbstractStateBackend> - implements KvState, State { +public abstract class AbstractRocksDBState> + implements KvState, State { pr
[1/2] flink-web git commit: Update website for Flink 0.10.2
Repository: flink-web Updated Branches: refs/heads/asf-site ffe31bf53 -> 1e07e82a9 http://git-wip-us.apache.org/repos/asf/flink-web/blob/1e07e82a/content/news/2014/08/26/release-0.6.html -- diff --git a/content/news/2014/08/26/release-0.6.html b/content/news/2014/08/26/release-0.6.html index 602c685..01c634f 100644 --- a/content/news/2014/08/26/release-0.6.html +++ b/content/news/2014/08/26/release-0.6.html @@ -83,9 +83,9 @@ Latest Release (Stable) -http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.1 Documentation -http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; class="active">0.10.1 Javadocs -http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html"; class="active">0.10.1 ScalaDocs +http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.2 Documentation +http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; class="active">0.10.2 Javadocs +http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html"; class="active">0.10.2 ScalaDocs http://git-wip-us.apache.org/repos/asf/flink-web/blob/1e07e82a/content/news/2014/09/26/release-0.6.1.html -- diff --git a/content/news/2014/09/26/release-0.6.1.html b/content/news/2014/09/26/release-0.6.1.html index 9f6c7e7..c6f508c 100644 --- a/content/news/2014/09/26/release-0.6.1.html +++ b/content/news/2014/09/26/release-0.6.1.html @@ -83,9 +83,9 @@ Latest Release (Stable) -http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.1 Documentation -http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; class="active">0.10.1 Javadocs -http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html"; class="active">0.10.1 ScalaDocs +http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.2 Documentation +http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; class="active">0.10.2 Javadocs +http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html"; class="active">0.10.2 ScalaDocs http://git-wip-us.apache.org/repos/asf/flink-web/blob/1e07e82a/content/news/2014/10/03/upcoming_events.html -- diff --git a/content/news/2014/10/03/upcoming_events.html b/content/news/2014/10/03/upcoming_events.html index 01744b2..b1207b3 100644 --- a/content/news/2014/10/03/upcoming_events.html +++ b/content/news/2014/10/03/upcoming_events.html @@ -83,9 +83,9 @@ Latest Release (Stable) -http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.1 Documentation -http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; class="active">0.10.1 Javadocs -http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html"; class="active">0.10.1 ScalaDocs +http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.2 Documentation +http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; class="active">0.10.2 Javadocs +http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html"; class="active">0.10.2 ScalaDocs http://git-wip-us.apache.org/repos/asf/flink-web/blob/1e07e82a/content/news/2014/11/04/release-0.7.0.html -- diff --git a/content/news/2014/11/04/release-0.7.0.html b/content/news/2014/11/04/release-0.7.0.html index 9d97cb6..c7a3bf9 100644 --- a/content/news/2014/11/04/release-0.7.0.html +++ b/content/news/2014/11/04/release-0.7.0.html @@ -83,9 +83,9 @@ Latest Release (Stable) -http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.1 Documentation -http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; class="active">0.10.1 Javadocs -http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html"; class="active">0.10.1 ScalaDocs +http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.2 Documentation +http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; class="active">0.10.2 Javadocs +http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index
[2/2] flink-web git commit: Update website for Flink 0.10.2
Update website for Flink 0.10.2 Project: http://git-wip-us.apache.org/repos/asf/flink-web/repo Commit: http://git-wip-us.apache.org/repos/asf/flink-web/commit/1e07e82a Tree: http://git-wip-us.apache.org/repos/asf/flink-web/tree/1e07e82a Diff: http://git-wip-us.apache.org/repos/asf/flink-web/diff/1e07e82a Branch: refs/heads/asf-site Commit: 1e07e82a9d050262ac1fbb1c80e60dd134a462e7 Parents: ffe31bf Author: Ufuk Celebi Authored: Thu Feb 11 19:42:13 2016 +0100 Committer: Ufuk Celebi Committed: Thu Feb 11 19:42:13 2016 +0100 -- _config.yml | 30 +-- _posts/2016-02-11-release-0.10.2.md | 34 +++ content/blog/feed.xml | 37 +++ content/blog/index.html | 49 ++-- content/blog/page2/index.html | 49 ++-- content/blog/page3/index.html | 49 ++-- content/blog/page4/index.html | 57 +++-- content/blog/page5/index.html | 44 +++- content/community.html | 6 +- content/contribute-code.html| 6 +- content/contribute-documentation.html | 6 +- content/downloads.html | 42 ++-- content/faq.html| 6 +- content/features.html | 6 +- content/how-to-contribute.html | 6 +- content/improve-website.html| 6 +- content/index.html | 28 +-- content/material.html | 6 +- content/news/2012/08/21/release02.html | 6 +- content/news/2012/10/15/icde2013.html | 6 +- content/news/2012/11/12/btw2013demo.html| 6 +- content/news/2012/11/21/previewICDE2013.html| 6 +- content/news/2013/03/27/www-demo-paper.html | 6 +- content/news/2013/10/21/cikm2013-paper.html | 6 +- .../2013/12/13/humboldt-innovation-award.html | 6 +- .../2014/01/10/stratosphere-hadoop-summit.html | 6 +- .../news/2014/01/12/0.4-migration-guide.html| 6 +- .../2014/01/13/stratosphere-release-0.4.html| 6 +- .../26/optimizer_plan_visualization_tool.html | 6 +- content/news/2014/01/28/querying_mongodb.html | 6 +- .../18/amazon-elastic-mapreduce-cloud-yarn.html | 6 +- ...stratosphere-google-summer-of-code-2014.html | 6 +- .../16/stratosphere-goes-apache-incubator.html | 6 +- content/news/2014/05/31/release-0.5.html| 6 +- content/news/2014/08/26/release-0.6.html| 6 +- content/news/2014/09/26/release-0.6.1.html | 6 +- content/news/2014/10/03/upcoming_events.html| 6 +- content/news/2014/11/04/release-0.7.0.html | 6 +- .../news/2014/11/18/hadoop-compatibility.html | 6 +- content/news/2015/01/06/december-in-flink.html | 6 +- content/news/2015/01/21/release-0.8.html| 6 +- content/news/2015/02/04/january-in-flink.html | 6 +- content/news/2015/02/09/streaming-example.html | 6 +- .../news/2015/03/02/february-2015-in-flink.html | 6 +- .../peeking-into-Apache-Flinks-Engine-Room.html | 6 +- content/news/2015/04/07/march-in-flink.html | 6 +- .../2015/04/13/release-0.9.0-milestone1.html| 6 +- .../05/11/Juggling-with-Bits-and-Bytes.html | 6 +- .../news/2015/05/14/Community-update-April.html | 6 +- .../announcing-apache-flink-0.9.0-release.html | 6 +- .../2015/08/24/introducing-flink-gelly.html | 6 +- content/news/2015/09/01/release-0.9.1.html | 6 +- content/news/2015/09/03/flink-forward.html | 6 +- content/news/2015/09/16/off-heap-memory.html| 6 +- content/news/2015/11/16/release-0.10.0.html | 6 +- content/news/2015/11/27/release-0.10.1.html | 6 +- .../news/2015/12/04/Introducing-windows.html| 6 +- .../news/2015/12/11/storm-compatibility.html| 6 +- content/news/2015/12/18/a-year-in-review.html | 6 +- content/news/2016/02/11/release-0.10.2.html | 236 +++ content/privacy-policy.html | 6 +- content/project.html| 6 +- content/q/quickstart-scala.sh | 2 +- content/q/quickstart.sh | 2 +- q/quickstart-scala.sh | 2 +- q/quickstart.sh | 2 +- 66 files changed, 682 insertions(+), 287 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink-web/blob/1e07e82a/_config.yml -- diff --git a/_config.yml b/_config.yml index 88c2571..ef628a0 100644 --- a/_config.yml +++ b/_config.yml @@ -9,10 +9,10 @@ url: http://flink.apache.org DOCS_BASE_URL: http://ci.apache.org/projects/flink/ -FLINK_VERSION_STABLE: 0.10.1 -FLINK_VERSION_HADOOP
flink git commit: [FLINK-3270] Add Kafka example
Repository: flink Updated Branches: refs/heads/master 1dee62b4b -> b6bfcf008 [FLINK-3270] Add Kafka example This closes #1533 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6bfcf00 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6bfcf00 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6bfcf00 Branch: refs/heads/master Commit: b6bfcf008e20eb4d4a3e81bedf7eaf871f121d4c Parents: 1dee62b Author: Robert Metzger Authored: Thu Jan 21 11:42:12 2016 +0100 Committer: Robert Metzger Committed: Fri Feb 12 15:02:28 2016 +0100 -- flink-examples/flink-examples-streaming/pom.xml | 51 ++ .../streaming/examples/kafka/ReadFromKafka.java | 63 + .../examples/kafka/WriteIntoKafka.java | 72 .../kafka/examples/ReadFromKafka.java | 56 --- .../kafka/examples/WriteIntoKafka.java | 70 --- 5 files changed, 186 insertions(+), 126 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b6bfcf00/flink-examples/flink-examples-streaming/pom.xml -- diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml index ba49dc5..3ea3276 100644 --- a/flink-examples/flink-examples-streaming/pom.xml +++ b/flink-examples/flink-examples-streaming/pom.xml @@ -61,6 +61,12 @@ under the License. org.apache.flink + flink-connector-kafka-0.8_2.10 + ${project.version} + + + + org.apache.flink flink-streaming-java_2.10 ${project.version} test @@ -522,6 +528,51 @@ under the License. + + + + org.apache.maven.plugins + maven-shade-plugin + + + fat-jar-kafka-example + package + + shade + + + false + false + false + + + org.apache.flink.streaming.examples.kafka.ReadFromKafka + + + Kafka + + + + * + + org/apache/flink/streaming/examples/kafka/** + org/apache/flink/streaming/** + org/apache/kafka/** + org/apache/curator/** + org/apache/zookeeper/** + org/apache/jute/** + org/I0Itec/** + jline/** + com/yammer/** + kafka/** + + + + + + +
[4/4] flink git commit: [hotfix] Minor code cleanups in AbstractStateBackend
[hotfix] Minor code cleanups in AbstractStateBackend Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d93b154d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d93b154d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d93b154d Branch: refs/heads/master Commit: d93b154d41045e89dcc2238885db83fd947cd104 Parents: b6bfcf0 Author: Stephan Ewen Authored: Thu Feb 11 22:13:37 2016 +0100 Committer: Stephan Ewen Committed: Fri Feb 12 18:51:01 2016 +0100 -- .../apache/flink/runtime/state/AbstractStateBackend.java | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d93b154d/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index e989af3..d0c4f82 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -123,7 +123,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable { * @param The type of the namespace. * @param The type of the value that the {@code ValueState} can store. */ - abstract protected ValueState createValueState(TypeSerializer namespaceSerializer, ValueStateDescriptor stateDesc) throws Exception; + protected abstract ValueState createValueState(TypeSerializer namespaceSerializer, ValueStateDescriptor stateDesc) throws Exception; /** * Creates and returns a new {@link ListState}. @@ -134,7 +134,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable { * @param The type of the namespace. * @param The type of the values that the {@code ListState} can store. */ - abstract protected ListState createListState(TypeSerializer namespaceSerializer, ListStateDescriptor stateDesc) throws Exception; + protected abstract ListState createListState(TypeSerializer namespaceSerializer, ListStateDescriptor stateDesc) throws Exception; /** * Creates and returns a new {@link ReducingState}. @@ -145,7 +145,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable { * @param The type of the namespace. * @param The type of the values that the {@code ListState} can store. */ - abstract protected ReducingState createReducingState(TypeSerializer namespaceSerializer, ReducingStateDescriptor stateDesc) throws Exception; + protected abstract ReducingState createReducingState(TypeSerializer namespaceSerializer, ReducingStateDescriptor stateDesc) throws Exception; /** * Sets the current key that is used for partitioned state. @@ -170,7 +170,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable { * * @param stateDescriptor The state identifier for the state. This contains name * and can create a default state value. -* @param The type of the key. + * @param The type of the namespace. * @param The type of the state. * @@ -179,7 +179,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable { * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. */ @SuppressWarnings({"rawtypes", "unchecked"}) - public S getPartitionedState(final N namespace, final TypeSerializer namespaceSerializer, final StateDescriptor stateDescriptor) throws Exception { + public S getPartitionedState(final N namespace, final TypeSerializer namespaceSerializer, final StateDescriptor stateDescriptor) throws Exception { if (keySerializer == null) { throw new Exception("State key serializer has not been configured in the config. " +
[2/4] flink git commit: [FLINK-2991] Add Folding State and use in WindowOperator
http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 88e619a..0d01733 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -36,10 +37,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; -import org.apache.flink.streaming.api.functions.windowing.FoldWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows; @@ -71,7 +71,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; * If an {@link Evictor} is specified it will be used to evict elements from the window after * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window. * When using an evictor window performance will degrade significantly, since - * pre-aggregation of window results cannot be used. + * incremental aggregation of window results cannot be used. * * * Note that the {@code WindowedStream} is purely and API construct, during runtime @@ -120,7 +120,7 @@ public class WindowedStream { * * * Note: When using an evictor window performance will degrade significantly, since -* pre-aggregation of window results cannot be used. +* incremental aggregation of window results cannot be used. */ @PublicEvolving public WindowedStream evictor(Evictor evictor) { @@ -137,13 +137,14 @@ public class WindowedStream { * Applies a reduce function to the window. The window function is called for each evaluation * of the window for each key individually. The output of the reduce function is interpreted * as a regular non-windowed stream. +* * -* This window will try and pre-aggregate data as much as the window policies permit. For example, -* tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per -* key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval, +* This window will try and incrementally aggregate data as much as the window policies permit. +* For example, tumbling time windows can aggregate the data, meaning that only one element per +* key is stored. Sliding time windows will aggregate on the granularity of the slide interval, * so a few elements are stored per key (one per slide interval). -* Custom windows may not be able to pre-aggregate, or may need to store extra values in an -* aggregation tree. +* Custom windows may not be able to incrementally aggregate, or may need to store extra values +* in an aggregation tree. * * @param function The reduce function. * @return The data stream that is the result of applying the reduce function to the window. @@ -159,48 +160,14 @@ public class WindowedStream { function = input.getExecutionEnvironment().clean(function); String callLocation = Utils.getCallLocationName(); - String udfName = "Reduce at " + callLocation; + String udfNa
[1/4] flink git commit: [FLINK-2991] Adjust RocksDB Folding State to latest RocksDBStateBackend
Repository: flink Updated Branches: refs/heads/master b6bfcf008 -> e927ec0be [FLINK-2991] Adjust RocksDB Folding State to latest RocksDBStateBackend Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e927ec0b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e927ec0b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e927ec0b Branch: refs/heads/master Commit: e927ec0bee4cf951e49b838538efa679e1af13e2 Parents: 94cba89 Author: Stephan Ewen Authored: Fri Feb 12 15:35:08 2016 +0100 Committer: Stephan Ewen Committed: Fri Feb 12 18:51:01 2016 +0100 -- .../streaming/state/AbstractRocksDBState.java | 6 +- .../streaming/state/RocksDBFoldingState.java| 89 +++- .../streaming/state/RocksDBStateBackend.java| 3 +- .../runtime/state/AbstractStateBackend.java | 2 +- 4 files changed, 57 insertions(+), 43 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e927ec0b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 05e15e8..7d3172a 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -95,7 +95,8 @@ public abstract class AbstractRocksDBState keySerializer, + protected AbstractRocksDBState( + TypeSerializer keySerializer, TypeSerializer namespaceSerializer, File dbPath, String checkpointPath, @@ -139,7 +140,8 @@ public abstract class AbstractRocksDBState keySerializer, + protected AbstractRocksDBState( + TypeSerializer keySerializer, TypeSerializer namespaceSerializer, File dbPath, String checkpointPath, http://git-wip-us.apache.org/repos/asf/flink/blob/e927ec0b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java index 7e4e573..d7b75bd 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java @@ -21,13 +21,12 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; + +import org.rocksdb.Options; import org.rocksdb.RocksDBException; import java.io.ByteArrayInputStream; @@ -39,16 +38,15 @@ import java.net.URI; import static java.util.Objects.requireNonNull; /** - * {@link ReducingState} implementation that stores state in RocksDB. + * {@link FoldingState} implementation that stores state in RocksDB. * * @param The type of the key. * @param The type of the namespace. * @param The type of the values that can be folded into the state. * @param The type of the value in the folding state. - * @param The type of the backend that snapshots this key/value state. */ -public class RocksDBFoldingState - extends AbstractRocksDBState, FoldingStateDescriptor, Backend> +public class RocksDBFoldingState + extends AbstractRocksDBState, FoldingStateDescriptor> implements FoldingState { /** Serializer for the values */ @@ -70,12 +68,16 @@ public class RocksDBFoldingState keySerializer, - TypeSerializer namespaceSeriali
[3/4] flink git commit: [FLINK-2991] Add Folding State and use in WindowOperator
[FLINK-2991] Add Folding State and use in WindowOperator This enables efficient incremental aggregation of fold window. This also adds: - WindowedStream.apply(initVal, foldFunction, windowFunction) - AllWindowedStream.apply(initVal, foldFunction, windowFunction) This closes #1605 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94cba899 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94cba899 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94cba899 Branch: refs/heads/master Commit: 94cba8998c726092e2cc80fd022ca40bf0c38ec2 Parents: d93b154 Author: Aljoscha Krettek Authored: Mon Feb 8 14:56:19 2016 +0100 Committer: Stephan Ewen Committed: Fri Feb 12 18:51:01 2016 +0100 -- .../streaming/state/RocksDBFoldingState.java| 187 ++ .../streaming/state/RocksDBListState.java | 9 +- .../streaming/state/RocksDBReducingState.java | 14 +- .../streaming/state/RocksDBStateBackend.java| 20 +- .../streaming/state/RocksDBValueState.java | 14 +- .../contrib/streaming/state/DbStateBackend.java | 18 ++ .../flink/api/common/state/FoldingState.java| 37 .../common/state/FoldingStateDescriptor.java| 108 ++ .../flink/api/common/state/StateBackend.java| 8 + .../flink/api/common/state/StateDescriptor.java | 2 +- .../runtime/state/AbstractStateBackend.java | 20 ++ .../runtime/state/GenericFoldingState.java | 133 + .../flink/runtime/state/GenericListState.java | 6 + .../runtime/state/GenericReducingState.java | 7 + .../state/filesystem/FsFoldingState.java| 145 ++ .../state/filesystem/FsStateBackend.java| 7 + .../runtime/state/memory/MemFoldingState.java | 118 +++ .../state/memory/MemoryStateBackend.java| 7 + .../runtime/state/StateBackendTestBase.java | 105 ++ .../api/datastream/AllWindowedStream.java | 103 -- .../api/datastream/WindowedStream.java | 198 +-- .../windowing/FoldAllWindowFunction.java| 92 - .../windowing/FoldApplyAllWindowFunction.java | 95 + .../windowing/FoldApplyWindowFunction.java | 95 + .../functions/windowing/FoldWindowFunction.java | 91 - .../windowing/PassThroughAllWindowFunction.java | 30 +++ .../windowing/PassThroughWindowFunction.java| 30 +++ .../windowing/ReduceAllWindowFunction.java | 30 --- .../windowing/ReduceWindowFunction.java | 30 --- .../ReduceWindowFunctionWithWindow.java | 31 --- .../operators/FoldApplyWindowFunctionTest.java | 143 ++ .../api/operators/FoldWindowFunctionTest.java | 132 - .../operators/windowing/WindowOperatorTest.java | 10 +- .../runtime/state/StateBackendITCase.java | 8 + .../streaming/api/scala/AllWindowedStream.scala | 60 ++ .../streaming/api/scala/WindowedStream.scala| 60 ++ 36 files changed, 1707 insertions(+), 496 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java new file mode 100644 index 000..7e4e573 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apac
flink git commit: [FLINK-3392] [kafka] Fix unsynchronized access in ClosableBlockingQueue
Repository: flink Updated Branches: refs/heads/master e927ec0be -> c47cb7af1 [FLINK-3392] [kafka] Fix unsynchronized access in ClosableBlockingQueue Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c47cb7af Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c47cb7af Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c47cb7af Branch: refs/heads/master Commit: c47cb7af1474b08ee1b7b7a813d03da022015ed1 Parents: e927ec0 Author: Stephan Ewen Authored: Fri Feb 12 19:30:22 2016 +0100 Committer: Stephan Ewen Committed: Fri Feb 12 19:30:22 2016 +0100 -- .../connectors/kafka/internals/ClosableBlockingQueue.java | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c47cb7af/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java index 856c2ad..23ff276 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java @@ -104,7 +104,12 @@ public class ClosableBlockingQueue { * @return The number of elements currently in the queue. */ public int size() { - return elements.size(); + lock.lock(); + try { + return elements.size(); + } finally { + lock.unlock(); + } } /**
[2/3] flink git commit: [FLINK-3352] Use HDFS Config in RocksDB Copy Utilities
[FLINK-3352] Use HDFS Config in RocksDB Copy Utilities This also moves the utilities (HDFSCopyFromLocal and HDFSCopyToLocal) to the RocksDB package because we would need a HDFS dependency in flink-core otherwise. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5d71909 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5d71909 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5d71909 Branch: refs/heads/master Commit: f5d719096083504a0b5827e35c1d28e1180e5e1d Parents: 31310cd Author: Aljoscha Krettek Authored: Thu Feb 11 19:07:18 2016 +0100 Committer: Aljoscha Krettek Committed: Fri Feb 12 22:59:14 2016 +0100 -- .../streaming/state/AbstractRocksDBState.java | 63 ++--- .../streaming/state/HDFSCopyFromLocal.java | 57 .../streaming/state/HDFSCopyToLocal.java| 58 .../streaming/state/RocksDBListState.java | 4 +- .../streaming/state/RocksDBReducingState.java | 6 +- .../streaming/state/RocksDBValueState.java | 4 +- .../streaming/state/HDFSCopyUtilitiesTest.java | 140 +++ .../apache/flink/util/HDFSCopyFromLocal.java| 48 --- .../org/apache/flink/util/HDFSCopyToLocal.java | 49 --- 9 files changed, 309 insertions(+), 120 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f5d71909/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 6e4adf5..76f05d6 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -24,12 +24,11 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot; import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.util.HDFSCopyFromLocal; -import org.apache.flink.util.HDFSCopyToLocal; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -46,7 +45,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.util.UUID; @@ -73,6 +74,8 @@ import static java.util.Objects.requireNonNull; public abstract class AbstractRocksDBState> implements KvState, State { + private static final String HADOOP_CONF_NAME = "hadoop-conf.binary"; + private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class); /** Serializer for the keys */ @@ -96,10 +99,15 @@ public abstract class AbstractRocksDBState namespaceSerializer, SD stateDesc) { - this.dbPath = dbPath; + this.basePath = basePath; this.checkpointPath = checkpointPath; this.backupUri = backupUri; this.checkpointId = checkpointId; @@ -339,7 +368,7 @@ public abstract class AbstractRocksDBState keySerializer, TypeSerializer namespaceSerializer, SD stateDesc, - File dbPath, + File basePath, String backupPath, String restorePath, Options options) throws Exception; @@ -360,13 +389,13 @@ public abstract class AbstractRocksDBState materialize() throws Exception { try { - HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri); + HDFSCopyFromLocal.copyFromLocal(state.hadoopConfPath, localBackupPath, backupUri); return state.createRocksDBSnapshot(backupUri, checkpointId); } catch (Exception e) { -
[3/3] flink git commit: [FLINK-3359] Make RocksDB File Copies Asynchronous
[FLINK-3359] Make RocksDB File Copies Asynchronous Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31310cd6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31310cd6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31310cd6 Branch: refs/heads/master Commit: 31310cd657381d4035cda9659d737ed440f90d26 Parents: c47cb7a Author: Aljoscha Krettek Authored: Thu Feb 11 17:29:37 2016 +0100 Committer: Aljoscha Krettek Committed: Fri Feb 12 22:59:14 2016 +0100 -- .../flink-statebackend-rocksdb/pom.xml | 15 +- .../streaming/state/AbstractRocksDBState.java | 161 +- .../streaming/state/RocksDBFoldingState.java| 2 +- .../streaming/state/RocksDBListState.java | 2 +- .../streaming/state/RocksDBReducingState.java | 2 +- .../streaming/state/RocksDBStateBackend.java| 6 +- .../streaming/state/RocksDBValueState.java | 2 +- .../state/RocksDBAsyncKVSnapshotTest.java | 208 +++ .../state/RocksDBStateBackendTest.java | 4 +- .../state/AsynchronousKvStateSnapshot.java | 62 ++ .../runtime/state/StateBackendTestBase.java | 180 ++-- .../streaming/runtime/tasks/StreamTask.java | 26 ++- .../runtime/tasks/StreamTaskStateList.java | 58 +++--- .../tasks/OneInputStreamTaskTestHarness.java| 7 + .../tasks/StreamTaskAsyncCheckpointTest.java| 3 +- .../EventTimeWindowCheckpointingITCase.java | 6 + 16 files changed, 584 insertions(+), 160 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/31310cd6/flink-contrib/flink-statebackend-rocksdb/pom.xml -- diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml index 999c496..9c1601e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/pom.xml +++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml @@ -57,7 +57,6 @@ under the License. rocksdbjni 4.1.0 - org.apache.flink flink-runtime_2.10 @@ -65,7 +64,17 @@ under the License. test-jar test - + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + test-jar + test + + + junit + junit + 4.11 + - http://git-wip-us.apache.org/repos/asf/flink/blob/31310cd6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 7d3172a..6e4adf5 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -24,11 +24,12 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot; import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.KvStateSnapshot; + import org.apache.flink.util.HDFSCopyFromLocal; import org.apache.flink.util.HDFSCopyToLocal; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -48,6 +49,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.net.URI; +import java.util.UUID; import static java.util.Objects.requireNonNull; @@ -58,6 +60,11 @@ import static java.util.Objects.requireNonNull; * checkpointing/restoring the database and for disposal in the {@link #dispose()} method. The * concrete subclasses just use the RocksDB handle to store/retrieve state. * + * State is checkpointed asynchronously. The synchronous part is drawing the actual backup + * from RocksDB, this is done in {@link #snapshot(long, long)}. This will return a + * {@link AsyncRocksDBSnapshot} that will perform the
[1/3] flink git commit: [hotfix] Fix field names in RocksDBStateBackend
Repository: flink Updated Branches: refs/heads/master c47cb7af1 -> 8b7caaa22 [hotfix] Fix field names in RocksDBStateBackend Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b7caaa2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b7caaa2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b7caaa2 Branch: refs/heads/master Commit: 8b7caaa22fee9a131292c1a3fdb45872d6836dbf Parents: f5d7190 Author: Aljoscha Krettek Authored: Fri Feb 12 18:13:39 2016 +0100 Committer: Aljoscha Krettek Committed: Fri Feb 12 22:59:14 2016 +0100 -- .../streaming/state/RocksDBStateBackend.java| 32 ++-- 1 file changed, 16 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8b7caaa2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 72a4c58..e3b4f4d 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -83,10 +83,10 @@ public class RocksDBStateBackend extends AbstractStateBackend { // DB storage directories /** Base paths for RocksDB directory, as configured. May be null. */ - private Path[] dbBasePaths; + private Path[] configuredDbBasePaths; /** Base paths for RocksDB directory, as initialized */ - private File[] dbStorageDirectories; + private File[] initializedDbBasePaths; private int nextDirectory; @@ -171,15 +171,15 @@ public class RocksDBStateBackend extends AbstractStateBackend { this.jobId = env.getJobID(); // initialize the paths where the local RocksDB files should be stored - if (dbBasePaths == null) { + if (configuredDbBasePaths == null) { // initialize from the temp directories - dbStorageDirectories = env.getIOManager().getSpillingDirectories(); + initializedDbBasePaths = env.getIOManager().getSpillingDirectories(); } else { - List dirs = new ArrayList<>(dbBasePaths.length); + List dirs = new ArrayList<>(configuredDbBasePaths.length); String errorMessage = ""; - for (Path path : dbBasePaths) { + for (Path path : configuredDbBasePaths) { File f = new File(path.toUri().getPath()); if (!f.exists() && !f.mkdirs()) { String msg = "Local DB files directory '" + f.getAbsolutePath() @@ -193,11 +193,11 @@ public class RocksDBStateBackend extends AbstractStateBackend { if (dirs.isEmpty()) { throw new Exception("No local storage directories available. " + errorMessage); } else { - dbStorageDirectories = dirs.toArray(new File[dirs.size()]); + initializedDbBasePaths = dirs.toArray(new File[dirs.size()]); } } - nextDirectory = new Random().nextInt(dbStorageDirectories.length); + nextDirectory = new Random().nextInt(initializedDbBasePaths.length); } @Override @@ -225,15 +225,15 @@ public class RocksDBStateBackend extends AbstractStateBackend { } File[] getStoragePaths() { - return dbStorageDirectories; + return initializedDbBasePaths; } File getNextStoragePath() { int ni = nextDirectory + 1; - ni = ni >= dbStorageDirectories.length ? 0 : ni; + ni = ni >= initializedDbBasePaths.length ? 0 : ni; nextDirectory = ni; - return dbStorageDirectories[ni]; + return initializedDbBasePaths[ni]; } // @@ -330,7 +330,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { */ public void setDbStoragePaths(String...
flink git commit: [FLINK-3393] [core] ExternalProcessRunner wait to finish copying error stream
Repository: flink Updated Branches: refs/heads/master 8b7caaa22 -> 8e55bbd41 [FLINK-3393] [core] ExternalProcessRunner wait to finish copying error stream Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e55bbd4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e55bbd4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e55bbd4 Branch: refs/heads/master Commit: 8e55bbd41725cd8c7f29a9e41993a22777ebdacf Parents: 8b7caaa Author: Greg Hogan Authored: Fri Feb 12 08:38:05 2016 -0500 Committer: Aljoscha Krettek Committed: Fri Feb 12 23:04:06 2016 +0100 -- .../java/org/apache/flink/util/ExternalProcessRunner.java | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8e55bbd4/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java b/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java index 8e4725c..b7e388c 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java @@ -41,6 +41,8 @@ public class ExternalProcessRunner { private final Process process; + private final Thread pipeForwarder; + final StringWriter errorOutput = new StringWriter(); /** @@ -63,7 +65,7 @@ public class ExternalProcessRunner { process = new ProcessBuilder(commandList).start(); - new PipeForwarder(process.getErrorStream(), errorOutput); + pipeForwarder = new PipeForwarder(process.getErrorStream(), errorOutput); } /** @@ -83,6 +85,9 @@ public class ExternalProcessRunner { try { int returnCode = process.waitFor(); + // wait to finish copying standard error stream + pipeForwarder.join(); + if (returnCode != 0) { // determine whether we failed because of a ClassNotFoundException and forward that if (getErrorOutput().toString().contains("Error: Could not find or load main class " + entryPointClassName)) {
flink git commit: [FLINK-3107] [runtime] Defer start of checkpoint ID counter
Repository: flink Updated Branches: refs/heads/master 8e55bbd41 -> 937963e33 [FLINK-3107] [runtime] Defer start of checkpoint ID counter Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/937963e3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/937963e3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/937963e3 Branch: refs/heads/master Commit: 937963e339ddd57cc65e3da8af5398600e4d9ad2 Parents: 8e55bbd Author: Ufuk Celebi Authored: Fri Feb 12 22:30:13 2016 +0100 Committer: Ufuk Celebi Committed: Fri Feb 12 23:17:04 2016 +0100 -- .../flink/runtime/checkpoint/SavepointCoordinator.java | 1 + .../runtime/checkpoint/SavepointCoordinatorTest.java | 13 - 2 files changed, 13 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/937963e3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java index 6ce6502..ea4b8ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java @@ -230,6 +230,7 @@ public class SavepointCoordinator extends CheckpointCoordinator { // Reset the checkpoint ID counter long nextCheckpointId = checkpoint.getCheckpointID(); + checkpointIdCounter.start(); checkpointIdCounter.setCount(nextCheckpointId + 1); LOG.info("Reset the checkpoint ID to {}", nextCheckpointId); http://git-wip-us.apache.org/repos/asf/flink/blob/937963e3/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java index 4f9ae60..6bbdf62 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java @@ -175,6 +175,7 @@ public class SavepointCoordinatorTest { } } + MockCheckpointIdCounter idCounter = new MockCheckpointIdCounter(); StateStore savepointStore = new HeapStateStore<>(); SavepointCoordinator coordinator = createSavepointCoordinator( @@ -184,7 +185,7 @@ public class SavepointCoordinatorTest { triggerVertices, ackVertices, new ExecutionVertex[] {}, - new MockCheckpointIdCounter(), + idCounter, savepointStore); Future savepointPathFuture = coordinator.triggerSavepoint(1231273123); @@ -213,6 +214,9 @@ public class SavepointCoordinatorTest { // Verify all promises removed assertEquals(0, getSavepointPromises(coordinator).size()); + // Verify checkpoint ID counter started + assertTrue(idCounter.isStarted()); + coordinator.shutdown(); } @@ -1083,15 +1087,18 @@ public class SavepointCoordinatorTest { private static class MockCheckpointIdCounter implements CheckpointIDCounter { + private boolean started; private long count; private long lastReturnedCount; @Override public void start() throws Exception { + started = true; } @Override public void stop() throws Exception { + started = false; } @Override @@ -1108,5 +1115,9 @@ public class SavepointCoordinatorTest { long getLastReturnedCount() { return lastReturnedCount; } + + public boolean isStarted() { + return started; + } } }