flink git commit: [hotfix][travis] deploy snapshots only for master branch

2016-02-12 Thread mxm
Repository: flink
Updated Branches:
  refs/heads/master edae79340 -> 1dee62b4b


[hotfix][travis] deploy snapshots only for master branch


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dee62b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dee62b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dee62b4

Branch: refs/heads/master
Commit: 1dee62b4b691fe29f6975844366b7328cf6c41d3
Parents: edae793
Author: Maximilian Michels 
Authored: Fri Feb 12 11:28:52 2016 +0100
Committer: Maximilian Michels 
Committed: Fri Feb 12 11:30:36 2016 +0100

--
 tools/deploy_to_maven.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/1dee62b4/tools/deploy_to_maven.sh
--
diff --git a/tools/deploy_to_maven.sh b/tools/deploy_to_maven.sh
index 8ace5aa..b2c6cce 100755
--- a/tools/deploy_to_maven.sh
+++ b/tools/deploy_to_maven.sh
@@ -72,7 +72,7 @@ pwd
 
 # Check if push/commit is eligible for deploying
 echo "Job: $TRAVIS_JOB_NUMBER ; isPR: $TRAVIS_PULL_REQUEST ; repo slug : 
$TRAVIS_REPO_SLUG "
-if [[ $TRAVIS_PULL_REQUEST == "false" ]] && [[ $TRAVIS_REPO_SLUG == 
"apache/flink" ]] ; then
+if [[ $TRAVIS_PULL_REQUEST == "false" ]] && [[ $TRAVIS_REPO_SLUG == 
"apache/flink" ]] && [[ $TRAVIS_BRANCH == "master" ]] ; then
 
echo "install lifecylce mapping fake plugin"
git clone 
https://github.com/mfriedenhagen/dummy-lifecycle-mapping-plugin.git



[23/50] [abbrv] flink git commit: [FLINK-3260] [runtime] Enforce terminal state of Executions

2016-02-12 Thread fhueske
[FLINK-3260] [runtime] Enforce terminal state of Executions

This commit fixes the problem that Executions could leave their terminal state
FINISHED to transition to FAILED. Such a transition will be propagated to the
ExecutionGraph where it entails JobStatus changes. Since the Execution already
reached a terminal state, it should not again affect the ExecutionGraph. This
can lead to an inconsistent state in case of a restart where the old Executions
get disassociated from the ExecutionGraph.

This closes #1613


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6968a57a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6968a57a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6968a57a

Branch: refs/heads/tableOnCalcite
Commit: 6968a57a1a31a11b33bacd2c94d6559bcabd6eb9
Parents: 48b7454
Author: Till Rohrmann 
Authored: Tue Feb 9 10:30:12 2016 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 10 15:34:37 2016 +0100

--
 .../flink/runtime/executiongraph/Execution.java |  14 +-
 .../ExecutionGraphRestartTest.java  |  90 +
 .../runtime/testingUtils/TestingCluster.scala   |   6 +-
 .../testingUtils/TestingTaskManagerLike.scala   |   4 +-
 .../runtime/testingUtils/TestingUtils.scala | 133 ++-
 5 files changed, 233 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index eb2e68c..db037bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -435,7 +435,7 @@ public class Execution implements Serializable {
return;
}
else if (current == CREATED || current == SCHEDULED) {
-   // from here, we can directly switch to 
cancelled, because the no task has been deployed
+   // from here, we can directly switch to 
cancelled, because no task has been deployed
if (transitionState(current, CANCELED)) {

// we skip the canceling state. set the 
timestamp, for a consistent appearance
@@ -754,11 +754,10 @@ public class Execution implements Serializable {
return false;
}
 
-   if (current == CANCELED) {
-   // we are already aborting or are already 
aborted
+   if (current == CANCELED || current == FINISHED) {
+   // we are already aborting or are already 
aborted or we are already finished
if (LOG.isDebugEnabled()) {
-   LOG.debug(String.format("Ignoring 
transition of vertex %s to %s while being %s", 
-   getVertexWithAttempt(), 
FAILED, CANCELED));
+   LOG.debug("Ignoring transition of 
vertex {} to {} while being {}.", getVertexWithAttempt(), FAILED, current);
}
return false;
}
@@ -928,6 +927,11 @@ public class Execution implements Serializable {
}
 
private boolean transitionState(ExecutionState currentState, 
ExecutionState targetState, Throwable error) {
+   // sanity check
+   if (currentState.isTerminal()) {
+   throw new IllegalStateException("Cannot leave terminal 
state " + currentState + " to transition to " + targetState + ".");
+   }
+
if (STATE_UPDATER.compareAndSet(this, currentState, 
targetState)) {
markTimestamp(targetState);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6968a57a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 0c3af8f..47a48a2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b

[37/50] [abbrv] flink git commit: Renamed Table.scala to table.scala

2016-02-12 Thread fhueske
Renamed Table.scala to table.scala


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4abca1d0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4abca1d0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4abca1d0

Branch: refs/heads/tableOnCalcite
Commit: 4abca1d0e3bba30f6adee39f9b278020fdf0f4fc
Parents: ed6cc91
Author: Fabian Hueske 
Authored: Mon Jan 25 15:34:24 2016 +0100
Committer: Fabian Hueske 
Committed: Fri Feb 12 11:34:08 2016 +0100

--
 .../org/apache/flink/api/table/Table.scala  | 392 ---
 .../org/apache/flink/api/table/table.scala  | 392 +++
 2 files changed, 392 insertions(+), 392 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4abca1d0/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
deleted file mode 100644
index 271aa99..000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataTypeField
-import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilder
-import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
-import org.apache.flink.api.table.plan.RexNodeTranslator
-import RexNodeTranslator.{toRexNode, extractAggCalls}
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.parser.ExpressionParser
-
-import scala.collection.JavaConverters._
-
-case class BaseTable(
-private[flink] val relNode: RelNode,
-private[flink] val relBuilder: RelBuilder)
-
-/**
- * The abstraction for writing Table API programs. Similar to how the batch 
and streaming APIs
- * have [[org.apache.flink.api.scala.DataSet]] and
- * [[org.apache.flink.streaming.api.scala.DataStream]].
- *
- * Use the methods of [[Table]] to transform data. Use
- * [[org.apache.flink.api.java.table.TableEnvironment]] to convert a [[Table]] 
back to a DataSet
- * or DataStream.
- *
- * When using Scala a [[Table]] can also be converted using implicit 
conversions.
- *
- * Example:
- *
- * {{{
- *   val table = set.toTable('a, 'b)
- *   ...
- *   val table2 = ...
- *   val set = table2.toDataSet[MyType]
- * }}}
- *
- * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either 
take arguments
- * in a Scala DSL or as an expression String. Please refer to the 
documentation for the expression
- * syntax.
- */
-class Table(
-  private[flink] override val relNode: RelNode,
-  private[flink] override val relBuilder: RelBuilder)
-  extends BaseTable(relNode, relBuilder)
-{
-
-  /**
-   * Performs a selection operation. Similar to an SQL SELECT statement. The 
field expressions
-   * can contain complex expressions and aggregations.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.select('key, 'value.avg + " The average" as 'average, 
'other.substring(0, 10))
-   * }}}
-   */
-  def select(fields: Expression*): Table = {
-
-relBuilder.push(relNode)
-
-// separate aggregations and selection expressions
-val extractedAggCalls: List[(Expression, List[AggCall])] = fields
-  .map(extractAggCalls(_, relBuilder)).toList
-
-// get aggregation calls
-val aggCalls: List[AggCall] = extractedAggCalls
-  .map(_._2).reduce( (x,y) => x ::: y)
-
-// apply aggregations
-if (aggCalls.nonEmpty) {
-  val emptyKey: GroupKey = relBuilder.groupKey()
-  relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava)
-}
-
-// get selection expressions
-val exprs: List[RexNode] = extractedAggCalls
-  .map(_._1)
-  .map(toRexNode(_, relB

[36/50] [abbrv] flink git commit: [FLINK-3223] Translate Table API calls to Calcite RelNodes.

2016-02-12 Thread fhueske
[FLINK-3223] Translate Table API calls to Calcite RelNodes.

This is an intermediate step to port the Table API on top of Calcite 
(FLINK-3221).
This commit:
- Adds Calcite as dependency to flink-table.
- Translates Table API calls directly into Calcite RelNodes.
- Modifies tests to check only the translation into logical plans but not the 
execution of Table API queries.
- Deactivates a few tests that are not supported yet.
- Removes a lot of the former Table API translation code.
- Removes bitwise operators from the Table API.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed6cc91e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed6cc91e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed6cc91e

Branch: refs/heads/tableOnCalcite
Commit: ed6cc91e227bc6b053ca9e0142a680f6301bdaf6
Parents: 1dee62b
Author: Fabian Hueske 
Authored: Fri Jan 15 17:46:39 2016 +0100
Committer: Fabian Hueske 
Committed: Fri Feb 12 11:34:08 2016 +0100

--
 flink-libraries/flink-table/pom.xml |   6 +
 .../api/java/table/JavaBatchTranslator.scala| 340 ++-
 .../java/table/JavaStreamingTranslator.scala| 241 -
 .../flink/api/java/table/TableEnvironment.scala |  43 +--
 .../api/scala/table/DataStreamConversions.scala |  68 
 .../api/scala/table/ScalaBatchTranslator.scala  |  26 +-
 .../scala/table/ScalaStreamingTranslator.scala  |  58 
 .../api/scala/table/TableConversions.scala  |  12 +-
 .../flink/api/scala/table/expressionDsl.scala   |   5 -
 .../apache/flink/api/scala/table/package.scala  |  21 +-
 .../org/apache/flink/api/table/Table.scala  | 236 +
 .../table/codegen/ExpressionCodeGenerator.scala |  35 --
 .../api/table/expressions/aggregations.scala|  40 +--
 .../analysis/ExtractEquiJoinFields.scala|  70 
 .../expressions/analysis/GroupByAnalyzer.scala  |  51 ---
 .../expressions/analysis/InsertAutoCasts.scala  |  92 -
 .../analysis/PredicateAnalyzer.scala|  35 --
 .../analysis/ResolveFieldReferences.scala   |  60 
 .../analysis/SelectionAnalyzer.scala|  36 --
 .../table/expressions/analysis/TypeCheck.scala  |  57 
 .../expressions/analysis/VerifyBoolean.scala|  41 ---
 .../analysis/VerifyNoAggregates.scala   |  53 ---
 .../analysis/VerifyNoNestedAggregates.scala |  54 ---
 .../api/table/expressions/arithmetic.scala  |  56 +--
 .../flink/api/table/expressions/cast.scala  |   5 +
 .../api/table/expressions/fieldExpression.scala |   5 +
 .../api/table/parser/ExpressionParser.scala |  16 +-
 .../api/table/plan/ExpandAggregations.scala | 147 
 .../flink/api/table/plan/PlanTranslator.scala   | 133 ++--
 .../api/table/plan/RexNodeTranslator.scala  | 184 ++
 .../flink/api/table/plan/TypeConverter.scala|  54 +++
 .../flink/api/table/plan/operations.scala   | 134 
 .../api/table/plan/operators/DataSetTable.scala |  66 
 .../apache/flink/api/table/plan/package.scala   |  24 --
 .../apache/flink/api/table/trees/Analyzer.scala |  43 ---
 .../org/apache/flink/api/table/trees/Rule.scala |  30 --
 .../examples/scala/StreamingTableFilter.scala   |  90 -
 .../api/java/table/test/AggregationsITCase.java |  67 ++--
 .../flink/api/java/table/test/AsITCase.java |  68 ++--
 .../api/java/table/test/CastingITCase.java  |  62 ++--
 .../api/java/table/test/ExpressionsITCase.java  |  83 +
 .../flink/api/java/table/test/FilterITCase.java |  58 ++--
 .../table/test/GroupedAggregationsITCase.java   |  35 +-
 .../flink/api/java/table/test/JoinITCase.java   |  70 ++--
 .../api/java/table/test/PojoGroupingITCase.java |  18 +-
 .../flink/api/java/table/test/SelectITCase.java |  70 ++--
 .../api/java/table/test/SqlExplainITCase.java   |   7 +
 .../table/test/StringExpressionsITCase.java |  45 +--
 .../flink/api/java/table/test/UnionITCase.java  |  44 +--
 .../scala/table/test/PageRankTableITCase.java   |   7 +-
 .../scala/table/test/TypeExceptionTest.scala|  42 ---
 .../scala/table/test/AggregationsITCase.scala   | 104 +++---
 .../flink/api/scala/table/test/AsITCase.scala   |  74 ++--
 .../api/scala/table/test/CastingITCase.scala|  57 ++--
 .../scala/table/test/ExpressionsITCase.scala|  90 ++---
 .../api/scala/table/test/FilterITCase.scala |  59 ++--
 .../table/test/GroupedAggregationsITCase.scala  |  63 ++--
 .../flink/api/scala/table/test/JoinITCase.scala |  73 ++--
 .../api/scala/table/test/SelectITCase.scala |  99 +++---
 .../api/scala/table/test/SqlExplainITCase.scala | 198 +--
 .../table/test/StringExpressionsITCase.scala|  52 +--
 .../api/scala/table/test/UnionITCase.scala  |  34 +-
 62 files changed, 1314 insertions(+), 2932 deletions(-)
--


http://git-wip-us.apache.org/repos/as

[50/50] [abbrv] flink git commit: [FLINK-3226] implement getUniqueName method in TranslationContext

2016-02-12 Thread fhueske
[FLINK-3226] implement getUniqueName method in TranslationContext

This closes #1600 and #1567


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7428263
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7428263
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7428263

Branch: refs/heads/tableOnCalcite
Commit: e742826326c2719f2f5e06a5e4020f743c3278f9
Parents: 18e7f2f
Author: vasia 
Authored: Thu Feb 11 14:24:24 2016 +0100
Committer: Fabian Hueske 
Committed: Fri Feb 12 11:34:10 2016 +0100

--
 .../flink/api/table/plan/RexNodeTranslator.scala  |  2 +-
 .../flink/api/table/plan/TranslationContext.scala |  4 
 .../plan/nodes/dataset/DataSetGroupReduce.scala   |  2 +-
 .../api/java/table/test/AggregationsITCase.java   |  4 +---
 .../api/scala/table/test/AggregationsITCase.scala | 18 +++---
 .../api/scala/table/test/ExpressionsITCase.scala  |  1 -
 6 files changed, 22 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index 07e3924..bad111f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -35,7 +35,7 @@ object RexNodeTranslator {
 
 exp match {
   case agg: Aggregation =>
-val name = "TMP_" + agg.hashCode().toHexString.toUpperCase
+val name = TranslationContext.getUniqueName
 val aggCall = toAggCall(agg, name, relBuilder)
 val fieldExp = new UnresolvedFieldReference(name)
 (fieldExp, List(aggCall))

http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
index b2b0c2b..51af8d6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
@@ -70,6 +70,10 @@ object TranslationContext {
 
   }
 
+  def getUniqueName: String = {
+"TMP_" + nameCntr.getAndIncrement()
+  }
+
   def getRelBuilder: RelBuilder = {
 relBuilder
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
index ad7e0e9..afe09bb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
@@ -67,7 +67,7 @@ class DataSetGroupReduce(
   config: TableConfig,
   expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config)
+val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config, 
expectedType)
 
 // get the output types
 val fieldsNames = rowType.getFieldNames

http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
--
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
index 8e81893..30598c4 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -62,7 +62,6 @@ public class AggregationsITCase exten

[39/50] [abbrv] flink git commit: [Flink-3226] Translate logical plan FlinkRels into physical plan DataSetRels.

2016-02-12 Thread fhueske
[Flink-3226] Translate logical plan FlinkRels into physical plan DataSetRels.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6dd2d779
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6dd2d779
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6dd2d779

Branch: refs/heads/tableOnCalcite
Commit: 6dd2d779d18113034311f13512deb5a7afbf885e
Parents: a4ad9dd
Author: chengxiang li 
Authored: Mon Feb 1 15:18:14 2016 +0800
Committer: Fabian Hueske 
Committed: Fri Feb 12 11:34:09 2016 +0100

--
 .../flink/api/table/plan/PlanGenException.scala |  26 
 .../flink/api/table/plan/TypeConverter.scala|  13 +-
 .../plan/functions/AggregateFunction.scala  |  71 +
 .../table/plan/functions/FunctionUtils.scala|  37 +
 .../plan/functions/aggregate/Aggregate.scala|  42 ++
 .../functions/aggregate/AggregateFactory.scala  | 135 +
 .../plan/functions/aggregate/AvgAggregate.scala | 148 +++
 .../functions/aggregate/CountAggregate.scala|  34 +
 .../plan/functions/aggregate/MaxAggregate.scala | 136 +
 .../plan/functions/aggregate/MinAggregate.scala | 136 +
 .../plan/functions/aggregate/SumAggregate.scala | 130 
 .../plan/nodes/dataset/DataSetGroupReduce.scala |   6 +-
 .../table/plan/nodes/dataset/DataSetJoin.scala  |   6 +-
 .../plan/nodes/dataset/DataSetReduce.scala  |   6 +-
 .../rules/dataset/DataSetAggregateRule.scala|  17 ++-
 .../plan/rules/dataset/DataSetJoinRule.scala| 102 -
 16 files changed, 1025 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6dd2d779/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala
new file mode 100644
index 000..2fd400d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan
+
+class PlanGenException(message: String, exception: Exception) extends
+RuntimeException(message: String, exception: Exception){
+
+  def this(message: String){
+this(message, null)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6dd2d779/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
index 227b3e8..b7cb200 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.api.table.plan
 
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.rel.core.JoinRelType._
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
@@ -29,8 +29,11 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.api.java.typeutils.ValueTypeInfo._
 import org.apache.flink.api.table.typeinfo.RowTypeInfo
 import org.apache.flink.api.table.{Row, TableException}
-
 import scala.collection.JavaConversions._
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.calcite

[01/50] [abbrv] flink git commit: [FLINK-3334] [conf] Include year-month-day in the log output [Forced Update!]

2016-02-12 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/tableOnCalcite fff25df5e -> e74282632 (forced update)


[FLINK-3334] [conf] Include year-month-day in the log output


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d51bec15
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d51bec15
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d51bec15

Branch: refs/heads/tableOnCalcite
Commit: d51bec1524a9832a7e6c0f6c5d5be5a42712d365
Parents: 457cb14
Author: Stephan Ewen 
Authored: Mon Feb 8 17:38:52 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 8 19:59:31 2016 +0100

--
 flink-dist/src/main/flink-bin/conf/log4j-cli.properties| 4 ++--
 .../src/main/flink-bin/conf/log4j-yarn-session.properties  | 2 +-
 flink-dist/src/main/flink-bin/conf/log4j.properties| 2 +-
 flink-dist/src/main/flink-bin/conf/logback-yarn.xml| 6 +++---
 flink-dist/src/main/flink-bin/conf/logback.xml | 5 +++--
 5 files changed, 10 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d51bec15/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
--
diff --git a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties 
b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
index 9c56e61..acb9d1a 100644
--- a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
+++ b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
@@ -23,7 +23,7 @@ log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.file=${log.file}
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - 
%m%n
+log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n
 
 
 # Log output from org.apache.flink.yarn to the console. This is used by the
@@ -34,7 +34,7 @@ log4j.logger.org.apache.hadoop=INFO, console
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
+log4j.appender.console.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
%-5p %-60c %x - %m%n
 
 # suppress the warning that hadoop native libraries are not loaded (irrelevant 
for the client)
 log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF

http://git-wip-us.apache.org/repos/asf/flink/blob/d51bec15/flink-dist/src/main/flink-bin/conf/log4j-yarn-session.properties
--
diff --git a/flink-dist/src/main/flink-bin/conf/log4j-yarn-session.properties 
b/flink-dist/src/main/flink-bin/conf/log4j-yarn-session.properties
index 1f49676..07f65a7 100644
--- a/flink-dist/src/main/flink-bin/conf/log4j-yarn-session.properties
+++ b/flink-dist/src/main/flink-bin/conf/log4j-yarn-session.properties
@@ -21,7 +21,7 @@ log4j.rootLogger=INFO, stdout
 # Log all infos in the given file
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
%-5p %-60c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, stdout

http://git-wip-us.apache.org/repos/asf/flink/blob/d51bec15/flink-dist/src/main/flink-bin/conf/log4j.properties
--
diff --git a/flink-dist/src/main/flink-bin/conf/log4j.properties 
b/flink-dist/src/main/flink-bin/conf/log4j.properties
index adcff38..97ec653 100644
--- a/flink-dist/src/main/flink-bin/conf/log4j.properties
+++ b/flink-dist/src/main/flink-bin/conf/log4j.properties
@@ -23,7 +23,7 @@ log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.file=${log.file}
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - 
%m%n
+log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

http://git-wip-us.apache.org/repos/asf/flink/blob/d51bec15/flink-dist/src/main/flink-bin/conf/logback-yarn.xml
--
diff --git a/flink-dist/src/main/flink-bin/conf/logback-yarn.xml 
b/flink-dist/src/main/flink-bin/conf/logback-yar

[46/50] [abbrv] flink git commit: [FLINK-3226] implement GroupReduce translation; enable tests for supported operations

2016-02-12 Thread fhueske
http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index f300547..c56ab92 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -77,7 +77,6 @@ class ExpressionsITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // advanced functions not supported yet
   @Ignore
   @Test
   def testCaseInsensitiveForAs(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
index 50ce150..82c4dc2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
@@ -46,7 +46,7 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) 
extends MultipleProgram
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testGroupedAggregate(): Unit = {
 
 // the grouping key needs to be forwarded to the intermediate DataSet, even
@@ -62,7 +62,7 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) 
extends MultipleProgram
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testGroupingKeyForwardIfNotUsed(): Unit = {
 
 // the grouping key needs to be forwarded to the intermediate DataSet, even
@@ -78,7 +78,7 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) 
extends MultipleProgram
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[NotImplementedError])
+  @Test
   def testGroupNoAggregation(): Unit = {
 
 val env = ExecutionEnvironment.getExecutionEnvironment



[33/50] [abbrv] flink git commit: [hotfix][travis] deploy snapshots only for master branch

2016-02-12 Thread fhueske
[hotfix][travis] deploy snapshots only for master branch


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dee62b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dee62b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dee62b4

Branch: refs/heads/tableOnCalcite
Commit: 1dee62b4b691fe29f6975844366b7328cf6c41d3
Parents: edae793
Author: Maximilian Michels 
Authored: Fri Feb 12 11:28:52 2016 +0100
Committer: Maximilian Michels 
Committed: Fri Feb 12 11:30:36 2016 +0100

--
 tools/deploy_to_maven.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/1dee62b4/tools/deploy_to_maven.sh
--
diff --git a/tools/deploy_to_maven.sh b/tools/deploy_to_maven.sh
index 8ace5aa..b2c6cce 100755
--- a/tools/deploy_to_maven.sh
+++ b/tools/deploy_to_maven.sh
@@ -72,7 +72,7 @@ pwd
 
 # Check if push/commit is eligible for deploying
 echo "Job: $TRAVIS_JOB_NUMBER ; isPR: $TRAVIS_PULL_REQUEST ; repo slug : 
$TRAVIS_REPO_SLUG "
-if [[ $TRAVIS_PULL_REQUEST == "false" ]] && [[ $TRAVIS_REPO_SLUG == 
"apache/flink" ]] ; then
+if [[ $TRAVIS_PULL_REQUEST == "false" ]] && [[ $TRAVIS_REPO_SLUG == 
"apache/flink" ]] && [[ $TRAVIS_BRANCH == "master" ]] ; then
 
echo "install lifecylce mapping fake plugin"
git clone 
https://github.com/mfriedenhagen/dummy-lifecycle-mapping-plugin.git



[13/50] [abbrv] flink git commit: [FLINK-3366] Rename @Experimental annotation to @PublicEvolving

2016-02-12 Thread fhueske
http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 5dd2988..223ebee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -183,7 +183,7 @@ public class RemoteEnvironment extends ExecutionEnvironment 
{
}
 
@Override
-   @Experimental
+   @PublicEvolving
public void startNewSession() throws Exception {
dispose();
jobID = JobID.generate();

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
index 2eda077..fdd114e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
@@ -17,6 +17,7 @@
  */
 
 package org.apache.flink.api.java.functions;
+
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
index dd00c31..0ce518e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
@@ -26,7 +26,7 @@ import java.lang.annotation.Retention;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
@@ -310,7 +310,7 @@ public class FunctionAnnotation {
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
-   @Experimental
+   @PublicEvolving
public @interface ReadFields {
String[] value();
}
@@ -341,7 +341,7 @@ public class FunctionAnnotation {
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
-   @Experimental
+   @PublicEvolving
public @interface ReadFieldsFirst {
String[] value();
}
@@ -372,7 +372,7 @@ public class FunctionAnnotation {
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
-   @Experimental
+   @PublicEvolving
public @interface ReadFieldsSecond {
String[] value();
}
@@ -389,7 +389,7 @@ public class FunctionAnnotation {
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
-   @Experimental
+   @PublicEvolving
public @interface SkipCodeAnalysis {
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 9c6621d..3d656a4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DataSource;
@@ -110,7 +110,7 @@ public class

[18/50] [abbrv] flink git commit: [FLINK-3341] Make 'auto.offset.reset' compatible with Kafka 0.8 and 0.9

2016-02-12 Thread fhueske
[FLINK-3341] Make 'auto.offset.reset' compatible with Kafka 0.8 and 0.9

This closes #1597


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9173825a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9173825a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9173825a

Branch: refs/heads/tableOnCalcite
Commit: 9173825aa6a1525d72a78cda16cb4ae1e9b8a8e4
Parents: 8ccd754
Author: Robert Metzger 
Authored: Sat Feb 6 13:27:06 2016 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 10 15:12:34 2016 +0100

--
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  2 +-
 .../kafka/internals/LegacyFetcher.java  |  3 ++-
 .../connectors/kafka/Kafka08ITCase.java | 22 ++--
 .../kafka/KafkaTestEnvironmentImpl.java | 20 +++---
 .../kafka/KafkaTestEnvironmentImpl.java | 22 +---
 .../connectors/kafka/KafkaConsumerTestBase.java |  5 ++---
 .../connectors/kafka/KafkaTestBase.java |  2 --
 .../connectors/kafka/KafkaTestEnvironment.java  |  3 ---
 .../flink/yarn/YARNSessionFIFOITCase.java   |  1 +
 9 files changed, 25 insertions(+), 55 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index bdea37f..1cdfffe 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -70,7 +70,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
  * socket.timeout.ms
  * socket.receive.buffer.bytes
  * fetch.message.max.bytes
- * auto.offset.reset with the values "latest", "earliest" 
(unlike 0.8.2 behavior)
+ * auto.offset.reset with the values "largest", "smallest"
  * fetch.wait.max.ms
  * 
  * 

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index fe7f777..10f4c41 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -576,7 +576,8 @@ public class LegacyFetcher implements Fetcher {
 
private static long getInvalidOffsetBehavior(Properties config) 
{
long timeType;
-   if 
(config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest").equals("latest")) {
+   String val = 
config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
+   if (val.equals("largest") || val.equals("latest")) { // 
largest is kafka 0.8, latest is kafka 0.9
timeType = OffsetRequest.LatestTime();
} else {
timeType = OffsetRequest.EarliestTime();

http://git-wip-us.apache.org/repos/asf/flink/blob/9173825a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 6a2fa27..a3e815e 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-streaming-con

[16/50] [abbrv] flink git commit: [docs] Fixed typo in Storm Compatibility page

2016-02-12 Thread fhueske
[docs] Fixed typo in Storm Compatibility page

This closes #1618


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aeee6efd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aeee6efd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aeee6efd

Branch: refs/heads/tableOnCalcite
Commit: aeee6efd451dcbdda37ed05779bda74291079ed5
Parents: 0a63797
Author: Georgios Andrianakis 
Authored: Wed Feb 10 14:30:46 2016 +0200
Committer: mjsax 
Committed: Wed Feb 10 14:51:14 2016 +0100

--
 docs/apis/streaming/storm_compatibility.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/aeee6efd/docs/apis/streaming/storm_compatibility.md
--
diff --git a/docs/apis/streaming/storm_compatibility.md 
b/docs/apis/streaming/storm_compatibility.md
index d646040..4d5715c 100644
--- a/docs/apis/streaming/storm_compatibility.md
+++ b/docs/apis/streaming/storm_compatibility.md
@@ -228,8 +228,8 @@ DataStream> multiStream = ...
 SplitStream> splitStream = multiStream.split(new 
StormStreamSelector());
 
 // remove SplitStreamType using SplitStreamMapper to get data stream of type 
SomeType
-DataStream s1 = splitStream.select("s1").map(new 
SplitStreamMapper()).returns(SomeType.classs);
-DataStream s2 = splitStream.select("s2").map(new 
SplitStreamMapper()).returns(SomeType.classs);
+DataStream s1 = splitStream.select("s1").map(new 
SplitStreamMapper()).returns(SomeType.class);
+DataStream s2 = splitStream.select("s2").map(new 
SplitStreamMapper()).returns(SomeType.class);
 
 // do further processing on s1 and s2
 [...]



[31/50] [abbrv] flink git commit: [FLINK-3389] [rocksdb] Add pre-defined option profiles.

2016-02-12 Thread fhueske
[FLINK-3389] [rocksdb] Add pre-defined option profiles.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be72758d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be72758d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be72758d

Branch: refs/heads/tableOnCalcite
Commit: be72758d1104400c8a48554d717c5b8cea5b3617
Parents: 82c7383
Author: Stephan Ewen 
Authored: Thu Feb 11 15:30:56 2016 +0100
Committer: Stephan Ewen 
Committed: Thu Feb 11 21:34:03 2016 +0100

--
 .../contrib/streaming/state/OptionsFactory.java | 32 ++-
 .../streaming/state/PredefinedOptions.java  | 91 
 .../streaming/state/RocksDBStateBackend.java| 76 ++--
 3 files changed, 190 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/be72758d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
index 73b1e5d..3e52f1f 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
@@ -24,8 +24,36 @@ import org.rocksdb.Options;
  * A factory for {@link Options} to be passed to the {@link 
RocksDBStateBackend}.
  * Options have to be created lazily by this factory, because the {@code 
Options}
  * class is not serializable and holds pointers to native code.
+ * 
+ * A typical pattern to use this OptionsFactory is as follows:
+ * 
+ * Java 8:
+ * {@code
+ * rocksDbBackend.setOptions( (currentOptions) -> 
currentOptions.setMaxOpenFiles(1024) );
+ * }
+ * 
+ * Java 7:
+ * {@code
+ * rocksDbBackend.setOptions(new OptionsFactory() {
+ * 
+ * public Options setOptions(Options currentOptions) {
+ * return currentOptions.setMaxOpenFiles(1024);
+ * }
+ * })
+ * }
  */
 public interface OptionsFactory extends java.io.Serializable {
-   
-   Options createOptions();
+
+   /**
+* This method should set the additional options on top of the current 
options object.
+* The current options object may contain pre-defined options based on 
flags that have
+* been configured on the state backend.
+* 
+* It is important to set the options on the current object and 
return the result from
+* the setter methods, otherwise the pre-defined options may get lost.
+* 
+* @param currentOptions The options object with the pre-defined 
options. 
+* @return The options object on which the additional options are set.
+*/
+   Options createOptions(Options currentOptions);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/be72758d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
new file mode 100644
index 000..383f043
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+
+/**
+ * The {@code PredefinedOptions} are configuration settings for the {@link 
RocksDBStateBackend}. 
+ * The various 

[19/50] [abbrv] flink git commit: [FLINK-3350] [tests] Increase default test Akka ask and ZooKeeper timeouts

2016-02-12 Thread fhueske
[FLINK-3350] [tests] Increase default test Akka ask and ZooKeeper timeouts


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8f40251
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8f40251
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8f40251

Branch: refs/heads/tableOnCalcite
Commit: b8f40251c6c45379118254c21b0d553c2d3b8937
Parents: 9173825
Author: Ufuk Celebi 
Authored: Mon Feb 8 14:24:43 2016 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 10 15:26:43 2016 +0100

--
 .../runtime/minicluster/FlinkMiniCluster.scala  | 20 ++--
 .../minicluster/LocalFlinkMiniCluster.scala |  2 ++
 .../runtime/testutils/ZooKeeperTestUtils.java   |  5 +++--
 .../runtime/testingUtils/TestingCluster.scala   |  2 ++
 .../kafka/KafkaTestEnvironmentImpl.java |  7 ---
 .../kafka/KafkaTestEnvironmentImpl.java |  7 ---
 ...ctTaskManagerProcessFailureRecoveryTest.java |  3 +++
 .../JobManagerCheckpointRecoveryITCase.java |  8 ++--
 .../recovery/ProcessFailureCancelingITCase.java |  2 +-
 9 files changed, 43 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
--
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 4cdda3f..0346d6d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.webmonitor.{WebMonitorUtils, 
WebMonitor}
 
 import org.slf4j.LoggerFactory
 
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.concurrent._
 import scala.concurrent.forkjoin.ForkJoinPool
 
@@ -86,7 +86,7 @@ abstract class FlinkMiniCluster(
   
   implicit val executionContext = ExecutionContext.global
 
-  implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
+  implicit val timeout = AkkaUtils.getTimeout(configuration)
 
   val recoveryMode = RecoveryMode.fromConfig(configuration)
 
@@ -188,6 +188,22 @@ abstract class FlinkMiniCluster(
 AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
   }
 
+  /**
+* Sets CI environment (Travis) specific config defaults.
+*/
+  def setDefaultCiConfig(config: Configuration) : Unit = {
+// 
https://docs.travis-ci.com/user/environment-variables#Default-Environment-Variables
+if (sys.env.contains("CI")) {
+  // Only set if nothing specified in config
+  if (config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, null) == null) {
+val duration = Duration(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT) * 10
+config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
s"${duration.toSeconds}s")
+
+LOG.info(s"Akka ask timeout set to ${duration.toSeconds}s")
+  }
+}
+  }
+
   // --
   //  Start/Stop Methods
   // --

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
--
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 913aec0..c803429 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -48,6 +48,8 @@ class LocalFlinkMiniCluster(
   override def generateConfiguration(userConfiguration: Configuration): 
Configuration = {
 val config = getDefaultConfig
 
+setDefaultCiConfig(config)
+
 config.addAll(userConfiguration)
 setMemory(config)
 initializeIOFormatClasses(config)

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f40251/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 6c33835..75569ec 100644
--- 
a/fl

[14/50] [abbrv] flink git commit: [FLINK-3366] Rename @Experimental annotation to @PublicEvolving

2016-02-12 Thread fhueske
[FLINK-3366] Rename @Experimental annotation to @PublicEvolving

This closes #1599


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/572855da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/572855da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/572855da

Branch: refs/heads/tableOnCalcite
Commit: 572855daad452eab169bc2ca27f9f1e4476df656
Parents: 59b237b
Author: Fabian Hueske 
Authored: Mon Feb 8 14:14:01 2016 +0100
Committer: Fabian Hueske 
Committed: Wed Feb 10 11:51:26 2016 +0100

--
 .../apache/flink/annotation/Experimental.java   | 35 ---
 .../apache/flink/annotation/PublicEvolving.java | 40 
 .../flink/api/common/ExecutionConfig.java   | 16 ++---
 .../flink/api/common/JobExecutionResult.java|  4 +-
 .../functions/IterationRuntimeContext.java  |  4 +-
 .../api/common/functions/RuntimeContext.java| 22 +++
 .../util/AbstractRuntimeUDFContext.java | 12 ++--
 .../common/io/statistics/BaseStatistics.java| 14 ++---
 .../api/common/typeinfo/BasicArrayTypeInfo.java | 22 +++
 .../api/common/typeinfo/BasicTypeInfo.java  | 22 +++
 .../api/common/typeinfo/NothingTypeInfo.java| 16 ++---
 .../common/typeinfo/PrimitiveArrayTypeInfo.java | 24 +++
 .../api/common/typeinfo/TypeInformation.java| 20 +++---
 .../api/common/typeutils/CompositeType.java | 36 +--
 .../flink/api/java/typeutils/AvroTypeInfo.java  |  4 +-
 .../api/java/typeutils/EitherTypeInfo.java  | 18 +++---
 .../flink/api/java/typeutils/EnumTypeInfo.java  | 20 +++---
 .../api/java/typeutils/GenericTypeInfo.java | 20 +++---
 .../api/java/typeutils/ObjectArrayTypeInfo.java | 22 +++
 .../flink/api/java/typeutils/PojoTypeInfo.java  | 28 -
 .../flink/api/java/typeutils/TupleTypeInfo.java | 16 ++---
 .../flink/api/java/typeutils/TypeExtractor.java | 66 ++--
 .../flink/api/java/typeutils/ValueTypeInfo.java | 24 +++
 .../api/java/typeutils/WritableTypeInfo.java| 22 +++
 .../java/org/apache/flink/api/java/DataSet.java |  6 +-
 .../flink/api/java/ExecutionEnvironment.java| 32 +-
 .../apache/flink/api/java/LocalEnvironment.java |  4 +-
 .../flink/api/java/RemoteEnvironment.java   |  4 +-
 .../flink/api/java/functions/FirstReducer.java  |  1 +
 .../api/java/functions/FunctionAnnotation.java  | 10 +--
 .../org/apache/flink/api/java/io/CsvReader.java |  4 +-
 .../flink/api/java/operators/CrossOperator.java |  4 +-
 .../flink/api/java/operators/DataSink.java  |  6 +-
 .../flink/api/java/operators/DataSource.java|  4 +-
 .../api/java/operators/DeltaIteration.java  |  6 +-
 .../api/java/operators/IterativeDataSet.java|  8 +--
 .../flink/api/java/operators/JoinOperator.java  |  4 +-
 .../api/java/operators/ProjectOperator.java |  4 +-
 .../flink/api/java/utils/DataSetUtils.java  |  4 +-
 .../flink/api/java/utils/ParameterTool.java |  4 +-
 .../org/apache/flink/api/scala/DataSet.scala|  8 +--
 .../flink/api/scala/ExecutionEnvironment.scala  | 34 +-
 .../api/scala/typeutils/CaseClassTypeInfo.scala | 16 ++---
 .../api/scala/typeutils/EitherTypeInfo.scala| 18 +++---
 .../api/scala/typeutils/EnumValueTypeInfo.scala | 20 +++---
 .../api/scala/typeutils/OptionTypeInfo.scala| 18 +++---
 .../scala/typeutils/ScalaNothingTypeInfo.scala  | 16 ++---
 .../scala/typeutils/TraversableTypeInfo.scala   | 18 +++---
 .../flink/api/scala/typeutils/TryTypeInfo.scala | 18 +++---
 .../api/scala/typeutils/UnitTypeInfo.scala  | 16 ++---
 .../apache/flink/api/scala/utils/package.scala  |  5 +-
 .../api/datastream/AllWindowedStream.java   |  8 +--
 .../api/datastream/CoGroupedStreams.java|  8 +--
 .../api/datastream/ConnectedStreams.java|  4 +-
 .../streaming/api/datastream/DataStream.java| 46 +++---
 .../api/datastream/DataStreamSink.java  |  6 +-
 .../api/datastream/IterativeStream.java |  4 +-
 .../streaming/api/datastream/JoinedStreams.java | 10 +--
 .../streaming/api/datastream/KeyedStream.java   |  6 +-
 .../datastream/SingleOutputStreamOperator.java  | 20 +++---
 .../streaming/api/datastream/SplitStream.java   |  4 +-
 .../api/datastream/WindowedStream.java  |  8 +--
 .../api/environment/CheckpointConfig.java   |  6 +-
 .../environment/StreamExecutionEnvironment.java | 36 +--
 .../source/EventTimeSourceFunction.java |  4 +-
 .../api/functions/source/SourceFunction.java|  6 +-
 .../streaming/api/scala/AllWindowedStream.scala |  6 +-
 .../streaming/api/scala/CoGroupedStreams.scala  | 10 +--
 .../flink/streaming/api/scala/DataStream.scala  | 57 -
 .../streaming/api/scala/JoinedStreams.scala |  8 +--
 .../flink/streaming/api/scala/KeyedStream.scala |  4 +-
 .../api/scala/StreamExecutionEnvironment.scala  | 28 -
 .../streaming/api/scala/Windo

[42/50] [abbrv] flink git commit: [FLINK-3225] Implemented optimization of Table API queries via Calcite

2016-02-12 Thread fhueske
[FLINK-3225] Implemented optimization of Table API queries via Calcite

- added logical Flink nodes and translation rules
- added stubs for DataSet translation rules
- ported DataSetNodes to Scala
- reactivated tests and added expected NotImplementedError


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe5e4065
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe5e4065
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe5e4065

Branch: refs/heads/tableOnCalcite
Commit: fe5e4065643dcb208f3990897f171780311502e9
Parents: 20235e0
Author: Fabian Hueske 
Authored: Tue Jan 26 13:22:38 2016 +0100
Committer: Fabian Hueske 
Committed: Fri Feb 12 11:34:09 2016 +0100

--
 .../apache/flink/api/table/package-info.java|  33 -
 .../api/table/sql/calcite/DataSetRelNode.java   |  29 -
 .../table/sql/calcite/node/DataSetExchange.java |  60 --
 .../table/sql/calcite/node/DataSetFlatMap.java  |  56 -
 .../api/table/sql/calcite/node/DataSetJoin.java |  80 -
 .../api/table/sql/calcite/node/DataSetMap.java  |  58 -
 .../table/sql/calcite/node/DataSetReduce.java   |  58 -
 .../sql/calcite/node/DataSetReduceGroup.java|  62 --
 .../api/table/sql/calcite/node/DataSetSort.java |  59 -
 .../table/sql/calcite/node/DataSetSource.java   |  55 -
 .../table/sql/calcite/node/DataSetUnion.java|  51 
 .../api/java/table/JavaBatchTranslator.scala|  68 +++
 .../api/table/plan/TranslationContext.scala |  79 
 .../plan/nodes/dataset/DataSetConvention.scala  |  42 +++
 .../plan/nodes/dataset/DataSetExchange.scala|  63 ++
 .../plan/nodes/dataset/DataSetFlatMap.scala |  62 ++
 .../plan/nodes/dataset/DataSetGroupReduce.scala |  63 ++
 .../table/plan/nodes/dataset/DataSetJoin.scala  |  73 +++
 .../table/plan/nodes/dataset/DataSetMap.scala   |  63 ++
 .../plan/nodes/dataset/DataSetReduce.scala  |  63 ++
 .../table/plan/nodes/dataset/DataSetRel.scala   |  33 +
 .../table/plan/nodes/dataset/DataSetSort.scala  |  62 ++
 .../plan/nodes/dataset/DataSetSource.scala  |  55 +
 .../table/plan/nodes/dataset/DataSetUnion.scala |  62 ++
 .../plan/nodes/logical/FlinkAggregate.scala |  76 
 .../table/plan/nodes/logical/FlinkCalc.scala|  37 ++
 .../plan/nodes/logical/FlinkConvention.scala|  42 +++
 .../table/plan/nodes/logical/FlinkFilter.scala  |  42 +++
 .../table/plan/nodes/logical/FlinkJoin.scala|  46 +++
 .../table/plan/nodes/logical/FlinkProject.scala |  45 +++
 .../api/table/plan/nodes/logical/FlinkRel.scala |  25 
 .../table/plan/nodes/logical/FlinkScan.scala|  31 +
 .../table/plan/nodes/logical/FlinkUnion.scala   |  38 ++
 .../api/table/plan/operators/DataSetTable.scala |  66 --
 .../api/table/plan/rules/FlinkRuleSets.scala| 120 +++
 .../rules/dataset/DataSetAggregateRule.scala|  53 
 .../plan/rules/dataset/DataSetCalcRule.scala|  52 
 .../plan/rules/dataset/DataSetFilterRule.scala  |  52 
 .../plan/rules/dataset/DataSetJoinRule.scala|  59 +
 .../plan/rules/dataset/DataSetProjectRule.scala |  52 
 .../plan/rules/dataset/DataSetScanRule.scala|  53 
 .../plan/rules/dataset/DataSetUnionRule.scala   |  53 
 .../plan/rules/logical/FlinkAggregateRule.scala |  53 
 .../plan/rules/logical/FlinkCalcRule.scala  |  50 
 .../plan/rules/logical/FlinkFilterRule.scala|  50 
 .../plan/rules/logical/FlinkJoinRule.scala  |  54 +
 .../plan/rules/logical/FlinkProjectRule.scala   |  51 
 .../plan/rules/logical/FlinkScanRule.scala  |  53 
 .../plan/rules/logical/FlinkUnionRule.scala |  54 +
 .../api/table/plan/schema/DataSetTable.scala|  89 ++
 .../org/apache/flink/api/table/table.scala  |   1 -
 .../api/java/table/test/AggregationsITCase.java |  72 +--
 .../flink/api/java/table/test/AsITCase.java |  62 +-
 .../api/java/table/test/CastingITCase.java  |  53 
 .../api/java/table/test/ExpressionsITCase.java  |  32 ++---
 .../flink/api/java/table/test/FilterITCase.java |  69 +--
 .../table/test/GroupedAggregationsITCase.java   |  40 +++
 .../flink/api/java/table/test/JoinITCase.java   |  70 ++-
 .../api/java/table/test/PojoGroupingITCase.java |  19 +--
 .../flink/api/java/table/test/SelectITCase.java |  68 +--
 .../table/test/StringExpressionsITCase.java |  40 ---
 .../flink/api/java/table/test/UnionITCase.java  |  47 
 .../scala/table/test/AggregationsITCase.scala   |  59 +
 .../flink/api/scala/table/test/AsITCase.scala   |  50 
 .../api/scala/table/test/CastingITCase.scal

[25/50] [abbrv] flink git commit: [FLINK-3371] [api-breaking] Move TriggerResult and TriggerContext to dedicated classes

2016-02-12 Thread fhueske
[FLINK-3371] [api-breaking] Move TriggerResult and TriggerContext to dedicated 
classes

This closes #1603


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50bd65a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50bd65a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50bd65a5

Branch: refs/heads/tableOnCalcite
Commit: 50bd65a574776817a03dd32fd438cb2327447109
Parents: 8df0bba
Author: Stephan Ewen 
Authored: Sun Feb 7 21:46:16 2016 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 10 22:15:31 2016 +0100

--
 .../examples/windowing/SessionWindowing.java|   3 +-
 .../api/windowing/assigners/GlobalWindows.java  |  10 +-
 .../triggers/ContinuousEventTimeTrigger.java|   7 +-
 .../ContinuousProcessingTimeTrigger.java|   2 +-
 .../api/windowing/triggers/CountTrigger.java|   2 +-
 .../api/windowing/triggers/DeltaTrigger.java|   7 +-
 .../windowing/triggers/EventTimeTrigger.java|   5 +-
 .../triggers/ProcessingTimeTrigger.java |   5 +-
 .../api/windowing/triggers/PurgingTrigger.java  |   4 +-
 .../api/windowing/triggers/Trigger.java | 102 +--
 .../api/windowing/triggers/TriggerResult.java   |  96 +
 .../windowing/EvictingWindowOperator.java   |   5 +-
 .../windowing/NonKeyedWindowOperator.java   |  34 ---
 .../operators/windowing/WindowOperator.java |  18 ++--
 14 files changed, 179 insertions(+), 121 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
--
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index bd82800..e2df160 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 
 import java.util.ArrayList;
@@ -95,7 +96,7 @@ public class SessionWindowing {
env.execute();
}
 
-   private static class SessionTrigger implements Trigger, GlobalWindow> {
+   private static class SessionTrigger extends Trigger, GlobalWindow> {
 
private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index d3eb2ac..a4d92cf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 
 import java.util.Collection;
@@ -67,15 +68,12 @@ public class GlobalWindows extends WindowAssigner {
/**
 * A trigger that never fires, as default Trigger for GlobalWindows.
 */
-   private static class NeverTrigger implements Trigger {
+   private static class NeverTrigger extends Trigger 
{
private static final long serialVersionUID = 1L;
 
@Override
-   public TriggerResult onElement(Object element,
-   long timestamp,
-   GlobalWindow window,
-

[03/50] [abbrv] flink git commit: [FLINK-3357] [core] Drop AbstractID#toShortString()

2016-02-12 Thread fhueske
[FLINK-3357] [core] Drop AbstractID#toShortString()

This closes #1601


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28feede7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28feede7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28feede7

Branch: refs/heads/tableOnCalcite
Commit: 28feede7d40dc73ec861cf93393650b8b10afc3a
Parents: 5c47f38
Author: Ufuk Celebi 
Authored: Mon Feb 8 16:05:27 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 8 20:18:20 2016 +0100

--
 .../streaming/state/RocksDBStateBackend.java|  4 +--
 .../contrib/streaming/state/DbStateBackend.java | 35 ++--
 .../streaming/state/DbStateBackendTest.java | 10 +++---
 .../java/org/apache/flink/util/AbstractID.java  | 15 -
 ...taskExecutionAttemptAccumulatorsHandler.java |  2 +-
 .../InputGateDeploymentDescriptor.java  |  2 +-
 .../io/network/partition/ResultPartitionID.java |  2 +-
 7 files changed, 28 insertions(+), 42 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index aaaeea4..eefa4a9 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -81,11 +81,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
}
 
private File getDbPath(String stateName) {
-   return new File(new File(new File(new File(dbBasePath), 
jobId.toShortString()), operatorIdentifier), stateName);
+   return new File(new File(new File(new File(dbBasePath), 
jobId.toString()), operatorIdentifier), stateName);
}
 
private String getCheckpointPath(String stateName) {
-   return checkpointDirectory + "/" + jobId.toShortString() + "/" 
+ operatorIdentifier + "/" + stateName;
+   return checkpointDirectory + "/" + jobId.toString() + "/" + 
operatorIdentifier + "/" + stateName;
}
 
@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/28feede7/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
--
diff --git 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
index 1d1ccd7..5162983 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java
@@ -17,14 +17,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import java.io.Serializable;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Random;
-import java.util.concurrent.Callable;
-
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -42,6 +34,14 @@ import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
 import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
 
 /**
@@ -76,6 +76,8 @@ public class DbStateBackend extends AbstractStateBackend {
 
private transient Environment env;
 
+   private transient String appId;
+
// --
 
private final DbBackendConfig dbConfig;
@@ -159,19 +161,14 @@ public class DbStateBackend extends AbstractStateBackend {
// store the checkpoint id and 
timestamp for bookkeeping
long handleId = rnd.nextLong();
 
-   // We use the ApplicationID here, 
because 

[40/50] [abbrv] flink git commit: [FLINK-3225] Implemented optimization of Table API queries via Calcite

2016-02-12 Thread fhueske
http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
--
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
index da82b8a..a3d31da 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.table.test;
 
-import org.apache.flink.api.table.ExpressionException;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.java.DataSet;
@@ -30,6 +29,7 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import scala.NotImplementedError;
 
 import java.util.List;
 
@@ -41,7 +41,7 @@ public class SelectITCase extends MultipleProgramsTestBase {
super(mode);
}
 
-   @Test
+   @Test(expected = NotImplementedError.class)
public void testSimpleSelectAllWithAs() throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
@@ -53,19 +53,19 @@ public class SelectITCase extends MultipleProgramsTestBase {
Table result = in
.select("a, b, c");
 
-// DataSet resultSet = tableEnv.toDataSet(result, Row.class);
-// List results = resultSet.collect();
-// String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
-// "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
-// "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-// "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-// "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-// "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
-// compareResultAsText(results, expected);
+   DataSet resultSet = tableEnv.toDataSet(result, Row.class);
+   List results = resultSet.collect();
+   String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
+   "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
+   "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+   "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+   "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+   "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
+   compareResultAsText(results, expected);
 
}
 
-   @Test
+   @Test(expected = NotImplementedError.class)
public void testSimpleSelectWithNaming() throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
@@ -78,12 +78,12 @@ public class SelectITCase extends MultipleProgramsTestBase {
.select("f0 as a, f1 as b")
.select("a, b");
 
-// DataSet resultSet = tableEnv.toDataSet(result, Row.class);
-// List results = resultSet.collect();
-// String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + 
"5,3\n" + "6,3\n" + "7,4\n" +
-// "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + 
"12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-// "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + 
"20,6\n" + "21,6\n";
-// compareResultAsText(results, expected);
+   DataSet resultSet = tableEnv.toDataSet(result, Row.class);
+   List results = resultSet.collect();
+   String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + 
"5,3\n" + "6,3\n" + "7,4\n" +
+   "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + 
"12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+   "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + 
"20,6\n" + "21,6\n";
+   compareResultAsText(results, expected);
}
 
@Tes

[11/50] [abbrv] flink git commit: [build system] Skip javadoc generation for java8 and snapshot deployments

2016-02-12 Thread fhueske
[build system] Skip javadoc generation for java8 and snapshot deployments


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59b237b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59b237b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59b237b5

Branch: refs/heads/tableOnCalcite
Commit: 59b237b5d2169d50f886af64df880cb614408890
Parents: de231d7
Author: Robert Metzger 
Authored: Tue Feb 9 23:50:45 2016 +0100
Committer: Robert Metzger 
Committed: Tue Feb 9 23:50:45 2016 +0100

--
 .travis.yml  | 4 ++--
 tools/deploy_to_maven.sh | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/59b237b5/.travis.yml
--
diff --git a/.travis.yml b/.travis.yml
index 50b5fab..de3 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -17,9 +17,9 @@ language: java
 matrix:
   include:
 - jdk: "oraclejdk8"
-  env: PROFILE="-Dhadoop.version=2.7.1 -Dscala-2.11 -Pinclude-yarn-tests"
+  env: PROFILE="-Dhadoop.version=2.7.1 -Dscala-2.11 -Pinclude-yarn-tests 
-Dmaven.javadoc.skip=true"
 - jdk: "oraclejdk8"
-  env: PROFILE="-Dhadoop.version=2.5.0 -Pinclude-yarn-tests"
+  env: PROFILE="-Dhadoop.version=2.5.0 -Pinclude-yarn-tests 
-Dmaven.javadoc.skip=true"
 - jdk: "openjdk7"
   env: PROFILE="-Dhadoop.version=2.4.0 -Dscala-2.11 -Pinclude-yarn-tests"
 - jdk: "oraclejdk7" # this uploads the Hadoop 2 build to Maven and S3

http://git-wip-us.apache.org/repos/asf/flink/blob/59b237b5/tools/deploy_to_maven.sh
--
diff --git a/tools/deploy_to_maven.sh b/tools/deploy_to_maven.sh
index 6dcc806..8ace5aa 100755
--- a/tools/deploy_to_maven.sh
+++ b/tools/deploy_to_maven.sh
@@ -99,7 +99,7 @@ if [[ $TRAVIS_PULL_REQUEST == "false" ]] && [[ 
$TRAVIS_REPO_SLUG == "apache/flin
# Deploy hadoop v1 to maven
echo "Generating poms for hadoop1"
./tools/generate_specific_pom.sh $CURRENT_FLINK_VERSION 
$CURRENT_FLINK_VERSION_HADOOP1 pom.hadoop1.xml
-   mvn -B -f pom.hadoop1.xml -Pdocs-and-source -DskipTests 
-Drat.ignoreErrors=true deploy --settings deploysettings.xml; 
+   mvn -B -f pom.hadoop1.xml -DskipTests -Drat.ignoreErrors=true 
deploy --settings deploysettings.xml; 
 
# deploy to s3
deploy_to_s3 $CURRENT_FLINK_VERSION "hadoop1"
@@ -116,13 +116,13 @@ if [[ $TRAVIS_PULL_REQUEST == "false" ]] && [[ 
$TRAVIS_REPO_SLUG == "apache/flin
# deploy hadoop v2 (yarn)
echo "deploy standard version (hadoop2) for scala 2.10 from 
flink2 directory"
# do the hadoop2 scala 2.10 in the background
-   (mvn -B -DskipTests -Pdocs-and-source -Drat.skip=true 
-Drat.ignoreErrors=true clean deploy --settings deploysettings.xml; 
deploy_to_s3 $CURRENT_FLINK_VERSION "hadoop2" ) &
+   (mvn -B -DskipTests -Drat.skip=true -Drat.ignoreErrors=true 
clean deploy --settings deploysettings.xml; deploy_to_s3 $CURRENT_FLINK_VERSION 
"hadoop2" ) &
 
# switch back to the regular flink directory
cd ../flink
echo "deploy hadoop2 version (standard) for scala 2.11 from 
flink directory"
./tools/change-scala-version.sh 2.11
-   mvn -B -DskipTests -Pdocs-and-source -Drat.skip=true 
-Drat.ignoreErrors=true clean deploy --settings deploysettings.xml;
+   mvn -B -DskipTests -Drat.skip=true -Drat.ignoreErrors=true 
clean deploy --settings deploysettings.xml;
 
deploy_to_s3 $CURRENT_FLINK_VERSION "hadoop2_2.11"
 



[20/50] [abbrv] flink git commit: [hotfix] [tests] Ignore ZooKeeper logs in process tests

2016-02-12 Thread fhueske
[hotfix] [tests] Ignore ZooKeeper logs in process tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a643c07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a643c07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a643c07

Branch: refs/heads/tableOnCalcite
Commit: 3a643c07792c62142c1f8cda172d4f4c3442c9b3
Parents: b8f4025
Author: Ufuk Celebi 
Authored: Tue Feb 9 11:01:39 2016 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 10 15:27:16 2016 +0100

--
 .../JobManagerSubmittedJobGraphsRecoveryITCase.java  | 6 +-
 .../org/apache/flink/runtime/testutils/CommonTestUtils.java  | 1 +
 .../AbstractJobManagerProcessFailureRecoveryITCase.java  | 8 +---
 .../org/apache/flink/test/recovery/ChaosMonkeyITCase.java| 4 
 .../test/recovery/JobManagerCheckpointRecoveryITCase.java| 8 
 5 files changed, 23 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3a643c07/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
index 99f7bd7..59c7c39 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.java
@@ -343,6 +343,10 @@ public class JobManagerSubmittedJobGraphsRecoveryITCase 
extends TestLogger {
assertEquals(2, jobSubmitSuccessMessages);
}
catch (Throwable t) {
+   // Print early (in some situations the process logs get 
too big
+   // for Travis and the root problem is not shown)
+   t.printStackTrace();
+
// In case of an error, print the job manager process 
logs.
if (jobManagerProcess[0] != null) {
jobManagerProcess[0].printProcessLog();
@@ -352,7 +356,7 @@ public class JobManagerSubmittedJobGraphsRecoveryITCase 
extends TestLogger {
jobManagerProcess[1].printProcessLog();
}
 
-   t.printStackTrace();
+   throw t;
}
finally {
if (jobManagerProcess[0] != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3a643c07/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 069b6af..bbb6a89 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -147,6 +147,7 @@ public class CommonTestUtils {

writer.println("log4j.appender.console.layout=org.apache.log4j.PatternLayout");

writer.println("log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p 
%c %x - %m%n");

writer.println("log4j.logger.org.eclipse.jetty.util.log=OFF");
+   writer.println("log4j.logger.org.apache.zookeeper=OFF");
 
writer.flush();
writer.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/3a643c07/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
index 2f6b762..6122352 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractJobManagerProcessFailureRecoveryITCase.java
@@ -246,8 +246,10 @@ public abstract class 
AbstractJobManagerProcessFailureRecoveryITCase extends Tes
fail("The program encountered a " + 
error.getClass().getSimpleName() + " : " + error.getMessage());
 

[09/50] [abbrv] flink git commit: [hotfix] Add missing entries to condif reference doc and removed outdated webclient entries

2016-02-12 Thread fhueske
[hotfix] Add missing entries to condif reference doc and removed outdated 
webclient entries


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72f8228a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72f8228a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72f8228a

Branch: refs/heads/tableOnCalcite
Commit: 72f8228ada76898c0bf168e5db8788dc35f7f4ed
Parents: c57a7e9
Author: Stephan Ewen 
Authored: Tue Feb 9 16:01:05 2016 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 9 16:01:05 2016 +0100

--
 docs/setup/config.md | 33 ++---
 1 file changed, 18 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/72f8228a/docs/setup/config.md
--
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 0eb3acb..343a856 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -39,6 +39,8 @@ The configuration files for the TaskManagers can be 
different, Flink does not as
 
 - `env.java.home`: The path to the Java installation to use (DEFAULT: system's 
default Java installation, if found). Needs to be specified if the startup 
scripts fail to automatically resolve the java home directory. Can be specified 
to point to a specific java installation or version. If this option is not 
specified, the startup scripts also evaluate the `$JAVA_HOME` environment 
variable.
 
+- `env.java.opts`: Set custom JVM options. This value is respected by Flink's 
start scripts and Flink's YARN client. This can be used to set different 
garbage collectors or to include remote debuggers into the JVMs running Flink's 
services.
+
 - `jobmanager.rpc.address`: The IP address of the JobManager, which is the 
master/coordinator of the distributed system (DEFAULT: localhost).
 
 - `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: 6123).
@@ -72,6 +74,14 @@ The default fraction for managed memory can be adjusted 
using the `taskmanager.m
 
 - `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false)
 
+### Memory and Performance Debugging
+
+These options are useful for debugging a Flink application for memory and 
garbage collection related isues, such as performance and out-of-memory process 
kills or exceptions.
+
+- `taskmanager.debug.memory.startLogThread`: Causes the TaskManagers to 
periodically log memory and Garbage collection statistics. The statistics 
include current heap-, off-heap, and other memory pool utilization, as well as 
the time spent on garbage collection, by heap memory pool.
+
+- `taskmanager.debug.memory.logIntervalMs`: The interval (in milliseconds) in 
which the TaskManagers log the memory and garbage collection statistics. Only 
has an effect, if `taskmanager.debug.memory.startLogThread` is set to true.
+
 ### Kerberos
 
 Flink supports Kerberos authentication of Hadoop services such as HDFS, YARN, 
or HBase.
@@ -98,8 +108,6 @@ If you are on YARN, then it is sufficient to authenticate 
the client with Kerber
 
 - `taskmanager.network.numberOfBuffers`: The number of buffers available to 
the network stack. This number determines how many streaming data exchange 
channels a TaskManager can have at the same time and how well buffered the 
channels are. If a job is rejected or you get a warning that the system has not 
enough buffers available, increase this value (DEFAULT: 2048).
 
-- `env.java.opts`: Set custom JVM options. This value is respected by Flink's 
start scripts and Flink's YARN client. This can be used to set different 
garbage collectors or to include remote debuggers into the JVMs running Flink's 
services.
-
 - `state.backend`: The backend that will be used to store operator state 
checkpoints if checkpointing is enabled. Supported backends:
-  `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's 
memory. Should be used only for minimal state (Kafka offsets) or testing and 
local debugging.
-  `filesystem`: State is in-memory on the TaskManagers, and state 
snapshots are stored in a file system. Supported are all filesystems supported 
by Flink, for example HDFS, S3, ...
@@ -130,6 +138,7 @@ The following parameters configure Flink's JobManager and 
TaskManagers.
 
 - `jobmanager.rpc.address`: The IP address of the JobManager, which is the 
master/coordinator of the distributed system (DEFAULT: localhost).
 - `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: 6123).
+- `taskmanager.hostname`: The hostname of the network interface that the 
TaskManager binds to. By default, the TaskManager searches for network 
interfaces that can connect to the JobManager and other TaskManagers. This 
op

[15/50] [abbrv] flink git commit: [FLINK-3234] [dataSet] Add KeySelector support to sortPartition operation.

2016-02-12 Thread fhueske
[FLINK-3234] [dataSet] Add KeySelector support to sortPartition operation.

This closes #1585


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a63797a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a63797a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a63797a

Branch: refs/heads/tableOnCalcite
Commit: 0a63797a6a5418b2363bca25bd77c33c217ff257
Parents: 572855d
Author: Chiwan Park 
Authored: Thu Feb 4 20:46:10 2016 +0900
Committer: Fabian Hueske 
Committed: Wed Feb 10 11:51:26 2016 +0100

--
 .../java/org/apache/flink/api/java/DataSet.java |  18 ++
 .../java/operators/SortPartitionOperator.java   | 174 +--
 .../api/java/operator/SortPartitionTest.java|  82 +
 .../org/apache/flink/api/scala/DataSet.scala|  25 +++
 .../api/scala/PartitionSortedDataSet.scala  |  22 ++-
 .../javaApiOperators/SortPartitionITCase.java   |  61 +++
 .../scala/operators/SortPartitionITCase.scala   |  59 +++
 7 files changed, 385 insertions(+), 56 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/0a63797a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
--
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index bfb97f4..c315920 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -1381,6 +1381,24 @@ public abstract class DataSet {
return new SortPartitionOperator<>(this, field, order, 
Utils.getCallLocationName());
}
 
+   /**
+* Locally sorts the partitions of the DataSet on the extracted key in 
the specified order.
+* The DataSet can be sorted on multiple values by returning a tuple 
from the KeySelector.
+*
+* Note that no additional sort keys can be appended to a KeySelector 
sort keys. To sort
+* the partitions by multiple values using KeySelector, the KeySelector 
must return a tuple
+* consisting of the values.
+*
+* @param keyExtractor The KeySelector function which extracts the key 
values from the DataSet
+* on which the DataSet is sorted.
+* @param order The order in which the DataSet is sorted.
+* @return The DataSet with sorted local partitions.
+*/
+   public  SortPartitionOperator sortPartition(KeySelector 
keyExtractor, Order order) {
+   final TypeInformation keyType = 
TypeExtractor.getKeySelectorTypes(keyExtractor, getType());
+   return new SortPartitionOperator<>(this, new 
Keys.SelectorFunctionKeys<>(clean(keyExtractor), getType(), keyType), order, 
Utils.getCallLocationName());
+   }
+
// 

//  Top-K
// 


http://git-wip-us.apache.org/repos/asf/flink/blob/0a63797a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
index 354a0cd..7f30a30 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
@@ -26,9 +26,13 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * This operator represents a DataSet with locally sorted partitions.
@@ -38,27 +42,58 @@ import java.util.Arrays;
 @Public
 public class SortPartitionOperator extends SingleInputOperator> {
 
-   private int[] sortKeyPositions;
+   private List> keys;
 
-   private Order[] sortOrders;
+   private List orders;
 
private final String sortLocationName;
 
+   private boolean useKeySelector;
 
-   public SortPartitionOperator(DataSet dataSet, int sortField, Order 
so

[43/50] [abbrv] flink git commit: [FLINK-3226] Implement a CodeGenerator for an efficient translation to DataSet programs

2016-02-12 Thread fhueske
http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
deleted file mode 100644
index 4927fb2..000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeinfo
-
-import java.util
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import 
org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder,
-FlatFieldDescriptor}
-import org.apache.flink.api.common.typeutils.{CompositeType, TypeSerializer}
-
-/**
- * A TypeInformation that is used to rename fields of an underlying 
CompositeType. This
- * allows the system to translate "as" Table API operations to a 
[[RenameOperator]]
- * that does not get translated to a runtime operator.
- */
-class RenamingProxyTypeInfo[T](
-val tpe: CompositeType[T],
-val fieldNames: Array[String])
-  extends CompositeType[T](tpe.getTypeClass) {
-
-  def getUnderlyingType: CompositeType[T] = tpe
-
-  if (tpe.getArity != fieldNames.length) {
-throw new IllegalArgumentException(s"Number of field names 
'${fieldNames.mkString(",")}' and " +
-  s"number of fields in underlying type $tpe do not match.")
-  }
-
-  if (fieldNames.toSet.size != fieldNames.length) {
-throw new IllegalArgumentException(s"New field names must be unique. " +
-  s"Names: ${fieldNames.mkString(",")}.")
-  }
-
-  override def getFieldIndex(fieldName: String): Int = {
-val result = fieldNames.indexOf(fieldName)
-if (result != fieldNames.lastIndexOf(fieldName)) {
-  -2
-} else {
-  result
-}
-  }
-  override def getFieldNames: Array[String] = fieldNames
-
-  override def isBasicType: Boolean = tpe.isBasicType
-
-  override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[T] =
-tpe.createSerializer(executionConfig)
-
-  override def getArity: Int = tpe.getArity
-
-  override def isKeyType: Boolean = tpe.isKeyType
-
-  override def getTypeClass: Class[T] = tpe.getTypeClass
-
-  override def getGenericParameters: java.util.List[TypeInformation[_]] = 
tpe.getGenericParameters
-
-  override def isTupleType: Boolean = tpe.isTupleType
-
-  override def toString = {
-s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " +
-  s"fields: ${fieldNames.mkString(", ")})"
-  }
-
-  override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos)
-
-  override def getTotalFields: Int = tpe.getTotalFields
-
-  override def createComparator(
-logicalKeyFields: Array[Int],
-orders: Array[Boolean],
-logicalFieldOffset: Int,
-executionConfig: ExecutionConfig) =
-tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, 
executionConfig)
-
-  override def getFlatFields(
-  fieldExpression: String,
-  offset: Int,
-  result: util.List[FlatFieldDescriptor]): Unit = {
-
-// split of head of field expression
-val (head, tail) = if (fieldExpression.indexOf('.') >= 0) {
-  fieldExpression.splitAt(fieldExpression.indexOf('.'))
-} else {
-  (fieldExpression, "")
-}
-
-// replace proxy field name by original field name of wrapped type
-val headPos = getFieldIndex(head)
-if (headPos >= 0) {
-  val resolvedHead = tpe.getFieldNames()(headPos)
-  val resolvedFieldExpr = resolvedHead + tail
-
-  // get flat fields with wrapped field name
-  tpe.getFlatFields(resolvedFieldExpr, offset, result)
-}
-else {
-  throw new IllegalArgumentException(s"Invalid field expression: 
${fieldExpression}")
-}
-  }
-
-  override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = {
-tpe.getTypeAt(fieldExpr

[35/50] [abbrv] flink git commit: [FLINK-3223] Translate Table API calls to Calcite RelNodes.

2016-02-12 Thread fhueske
http://git-wip-us.apache.org/repos/asf/flink/blob/ed6cc91e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
deleted file mode 100644
index 07acf1e..000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.table.expressions.{Expression, Aggregation}
-
-import scala.collection.mutable
-
-import org.apache.flink.api.table.trees.Rule
-
-/**
- * Rule that verifies that an expression does not contain aggregate operations
- * as children of aggregate operations.
- */
-class VerifyNoNestedAggregates extends Rule[Expression] {
-
-  def apply(expr: Expression) = {
-val errors = mutable.MutableList[String]()
-
-val result = expr.transformPre {
-  case agg: Aggregation=> {
-if (agg.child.exists(_.isInstanceOf[Aggregation])) {
-  errors += s"""Found nested aggregation inside "$agg"."""
-}
-agg
-  }
-}
-
-if (errors.length > 0) {
-  throw new ExpressionException(
-s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
-}
-
-result
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed6cc91e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
index e866ea0..c23fa03 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
@@ -17,8 +17,8 @@
  */
 package org.apache.flink.api.table.expressions
 
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo}
 import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, 
NumericTypeInfo, TypeInformation}
 
 abstract class BinaryArithmetic extends BinaryExpression { self: Product =>
   def typeInfo = {
@@ -83,7 +83,7 @@ case class Mul(left: Expression, right: Expression) extends 
BinaryArithmetic {
 }
 
 case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left * $right)"
+  override def toString = s"($left % $right)"
 }
 
 case class Abs(child: Expression) extends UnaryExpression {
@@ -91,55 +91,3 @@ case class Abs(child: Expression) extends UnaryExpression {
 
   override def toString = s"abs($child)"
 }
-
-abstract class BitwiseBinaryArithmetic extends BinaryExpression { self: 
Product =>
-  def typeInfo: TypeInformation[_] = {
-if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-  throw new ExpressionException(
-s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""")
-}
-if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-  throw new ExpressionException(
-s"""Non-integer operand "${right}" of type ${right.typeInfo} in 
$this""")
-}
-if (left.typeInfo != right.typeInfo) {
-  throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
-s"${right.typeInfo} in $this")
-}
-if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
-  left.typeInfo
-} else {
-  BasicTypeInfo.INT_TYPE_INFO
-}
-  }
-}
-
-case class BitwiseAnd(left: Expression, right: Expression) extends 
BitwiseBinaryArit

[08/50] [abbrv] flink git commit: [FLINK-3373] [build] Bump version of transitine HTTP Components dependency to 4.4.4 (core) / 4.5.1 (client)

2016-02-12 Thread fhueske
[FLINK-3373] [build] Bump version of transitine HTTP Components dependency to 
4.4.4 (core) / 4.5.1 (client)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c57a7e91
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c57a7e91
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c57a7e91

Branch: refs/heads/tableOnCalcite
Commit: c57a7e910e1dcb45c2c3f7b7743a1c8b37ce7639
Parents: a4f0692
Author: Stephan Ewen 
Authored: Tue Feb 9 14:58:09 2016 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 9 15:00:51 2016 +0100

--
 pom.xml | 11 ++-
 1 file changed, 2 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c57a7e91/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 1f1a772..81528c8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -240,13 +240,6 @@ under the License.
1.7


-   
-   
-   stax
-   stax-api
-   1.0.1
-   
-   


com.esotericsoftware.kryo
@@ -351,13 +344,13 @@ under the License.

org.apache.httpcomponents
httpcore
-   4.2.5
+   4.4.4

 

org.apache.httpcomponents
httpclient
-   4.2.6
+   4.5.1

 




[07/50] [abbrv] flink git commit: [FLINK-3286] Remove the files from the debian packaging as well

2016-02-12 Thread fhueske
[FLINK-3286] Remove the files from the debian packaging as well


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4f0692e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4f0692e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4f0692e

Branch: refs/heads/tableOnCalcite
Commit: a4f0692e9c15b3e30c25716eca2d83e7140d687e
Parents: 9ee1679
Author: Robert Metzger 
Authored: Tue Feb 9 10:10:56 2016 +0100
Committer: Robert Metzger 
Committed: Tue Feb 9 12:09:29 2016 +0100

--
 flink-dist/src/deb/bin/jobmanager   | 236 ---
 flink-dist/src/deb/bin/taskmanager  | 236 ---
 flink-dist/src/deb/bin/webclient| 236 ---
 flink-dist/src/deb/control/control  |  26 
 flink-dist/src/deb/control/postinst |  75 --
 5 files changed, 809 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a4f0692e/flink-dist/src/deb/bin/jobmanager
--
diff --git a/flink-dist/src/deb/bin/jobmanager 
b/flink-dist/src/deb/bin/jobmanager
deleted file mode 100755
index 1ec0172..000
--- a/flink-dist/src/deb/bin/jobmanager
+++ /dev/null
@@ -1,236 +0,0 @@
-#! /bin/sh
-
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-#
-# skeleton  example file to build /etc/init.d/ scripts.
-#This file should be used to construct scripts for /etc/init.d.
-#
-#Written by Miquel van Smoorenburg .
-#Modified for Debian
-#by Ian Murdock .
-#   Further changes by Javier Fernandez-Sanguino 
-#
-# Version:  @(#)skeleton  1.9  26-Feb-2001  miqu...@cistron.nl
-#
-# Starts a flink jobmanager
-#
-# chkconfig: 2345 85 15
-# description: flink jobmanager
-#
-### BEGIN INIT INFO
-# Provides:  flink-jobmanager
-# Required-Start:$network $local_fs
-# Required-Stop:
-# Should-Start:  $named
-# Should-Stop:
-# Default-Start: 2 3 4 5
-# Default-Stop:  0 1 6
-# Short-Description: flink jobmanager daemon
-### END INIT INFO
-
-# Include flink defaults if available
-if [ -f /etc/default/flink ] ; then
-  . /etc/default/flink
-fi
-
-
-if [ "$FLINK_PID_DIR" = "" ]; then
-   FLINK_PID_DIR=/tmp
-fi
-
-if [ "$FLINK_IDENT_STRING" = "" ]; then
-   FLINK_IDENT_STRING="$USER"
-fi
-
-FLINK_HOME=/usr/share/flink-dist
-PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
-DAEMON_SCRIPT=$FLINK_HOME/bin/jobmanager.sh
-NAME=flink-jobmanager
-DESC="flink jobmanager daemon"
-PID_FILE=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-jobmanager.pid
-
-test -x $DAEMON_SCRIPT || exit 1
-
-
-DODTIME=5   # Time to wait for the server to die, in seconds
-# If this value is set too low you might not
-# let some servers to die gracefully and
-# 'restart' will not work
-
-# Checks if the given pid represents a live process.
-# Returns 0 if the pid is a live process, 1 otherwise
-flink_is_process_alive() {
-  local pid="$1" 
-  ps -fp $pid | grep $pid | grep jobmanager > /dev/null 2>&1
-}
-
-# Check if the process associated to a pidfile is running.
-# Return 0 if the pidfile exists and the process is running, 1 otherwise
-flink_check_pidfile() {
-  local pidfile="$1" # IN
-  local pid
-
-  pid=`cat "$pidfile" 2>/dev/null`
-  if [ "$pid" = '' ]; then
-# The file probably does not exist or is empty. 
-return 1
-  fi
-  
-  set -- $pid
-  pid="$1"
-
-  flink_is_process_alive $pid
-}
-
-flink_process_kill() {
-   local pid="$1"# IN
-   local signal="$2" # IN
-   local second
-
-   kill -$signal $pid 2>/dev/null
-
-   # Wait a bit to see if the dirty job has really been done
-   for second in 0 1 2 3 4 5 6 7 8 9 10; do
-  if flink_is_process_alive "$pid"; then
- # Success
- return 0
-  fi
-
-  sleep 1
-   done
-

[27/50] [abbrv] flink git commit: [FLINK-3369] [runtime] Make RemoteTransportException instance of CancelTaskException

2016-02-12 Thread fhueske
[FLINK-3369] [runtime] Make RemoteTransportException instance of 
CancelTaskException

Problem: RemoteTransportException (RTE) is thrown on data transfer failures
when the remote data producer fails. Because RTE is an instance of IOException,
it can happen that the RTE is reported as the root job failure cause.

Solution: Make RTE instance of CancelTaskException, leading to cancellation of
the task and not failure.

Squashes the following commit:

[pr-comments] Add remote address to RemoteTransportException

This closes #1621.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf3ae88b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf3ae88b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf3ae88b

Branch: refs/heads/tableOnCalcite
Commit: cf3ae88b73e30a2d69ac1cc6009a8304ea3f53cc
Parents: fd324ea
Author: Ufuk Celebi 
Authored: Wed Feb 10 19:51:20 2016 +0100
Committer: Ufuk Celebi 
Committed: Thu Feb 11 14:39:40 2016 +0100

--
 .../runtime/execution/CancelTaskException.java  |  4 ++
 .../network/netty/PartitionRequestClient.java   |  6 +--
 .../netty/PartitionRequestClientFactory.java|  4 +-
 .../netty/PartitionRequestClientHandler.java| 17 +++-
 .../exception/LocalTransportException.java  | 22 +++---
 .../exception/RemoteTransportException.java | 36 +---
 .../netty/exception/TransportException.java | 43 
 7 files changed, 62 insertions(+), 70 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
index 3bcbe2e..ebf58ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
@@ -34,6 +34,10 @@ public class CancelTaskException extends RuntimeException {
super(msg);
}
 
+   public CancelTaskException(String msg, Throwable cause) {
+   super(msg, cause);
+   }
+
public CancelTaskException() {
super();
}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index f6120d4..fb24a8e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -114,7 +114,7 @@ public class PartitionRequestClient {
inputChannel.onError(
new 
LocalTransportException(

"Sending the partition request failed.",
-   
future.channel().localAddress(), future.cause()
+   
future.cause()
));
}
}
@@ -158,7 +158,7 @@ public class PartitionRequestClient {
if 
(!future.isSuccess()) {

inputChannel.onError(new LocalTransportException(

"Sending the task event failed.",
-   
future.channel().localAddress(), future.cause()
+   
future.cause()
));
}
}
@@ -185,7 +185,7 @@ public class PartitionRequestClient {
 
private void checkNotClosed() throws IOException {
if (closeReferenceCounter.isDisposed()) {
-   throw new LocalTransportEx

[34/50] [abbrv] flink git commit: [FLINK-3223] Translate Table API calls to Calcite RelNodes.

2016-02-12 Thread fhueske
http://git-wip-us.apache.org/repos/asf/flink/blob/ed6cc91e/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
--
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
index 9e42f53..da82b8a 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java
@@ -53,15 +53,15 @@ public class SelectITCase extends MultipleProgramsTestBase {
Table result = in
.select("a, b, c");
 
-   DataSet resultSet = tableEnv.toDataSet(result, Row.class);
-   List results = resultSet.collect();
-   String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
-   "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
-   "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-   "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-   "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-   "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
-   compareResultAsText(results, expected);
+// DataSet resultSet = tableEnv.toDataSet(result, Row.class);
+// List results = resultSet.collect();
+// String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
+// "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
+// "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+// "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+// "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+// "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
+// compareResultAsText(results, expected);
 
}
 
@@ -78,15 +78,15 @@ public class SelectITCase extends MultipleProgramsTestBase {
.select("f0 as a, f1 as b")
.select("a, b");
 
-   DataSet resultSet = tableEnv.toDataSet(result, Row.class);
-   List results = resultSet.collect();
-   String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + 
"5,3\n" + "6,3\n" + "7,4\n" +
-   "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + 
"12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-   "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + 
"20,6\n" + "21,6\n";
-   compareResultAsText(results, expected);
+// DataSet resultSet = tableEnv.toDataSet(result, Row.class);
+// List results = resultSet.collect();
+// String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + 
"5,3\n" + "6,3\n" + "7,4\n" +
+// "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + 
"12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+// "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + 
"20,6\n" + "21,6\n";
+// compareResultAsText(results, expected);
}
 
-   @Test(expected = ExpressionException.class)
+   @Test(expected = IllegalArgumentException.class)
public void testAsWithToFewFields() throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
@@ -95,13 +95,13 @@ public class SelectITCase extends MultipleProgramsTestBase {
 
Table in = tableEnv.fromDataSet(ds, "a, b");
 
-   DataSet resultSet = tableEnv.toDataSet(in, Row.class);
-   List results = resultSet.collect();
-   String expected = " sorry dude ";
-   compareResultAsText(results, expected);
+// DataSet resultSet = tableEnv.toDataSet(in, Row.class);
+// List results = resultSet.collect();
+// String expected = " sorry dude ";
+// compareResultAsText(results, expected);
}
 
-   @Test(expected = ExpressionException.class)
+   @Test(expected = IllegalArgumentException.class)
public void testAsWithToManyFields() throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment

[45/50] [abbrv] flink git commit: [FLINK-3226] Implement a CodeGenerator for an efficient translation to DataSet programs

2016-02-12 Thread fhueske
[FLINK-3226] Implement a CodeGenerator for an efficient translation to DataSet 
programs

This closes #1595


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4ad9dd5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4ad9dd5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4ad9dd5

Branch: refs/heads/tableOnCalcite
Commit: a4ad9dd566353c578ad660fda7039757a605f27d
Parents: 99f60c8
Author: twalthr 
Authored: Fri Feb 5 17:40:54 2016 +0100
Committer: Fabian Hueske 
Committed: Fri Feb 12 11:34:09 2016 +0100

--
 .../api/java/table/JavaBatchTranslator.scala|  11 +-
 .../flink/api/java/table/TableEnvironment.scala |  15 +-
 .../api/scala/table/ScalaBatchTranslator.scala  |   6 +-
 .../api/scala/table/TableConversions.scala  |   9 +-
 .../api/scala/table/TableEnvironment.scala  |  75 ++
 .../apache/flink/api/table/TableConfig.scala|  30 +-
 .../apache/flink/api/table/TableException.scala |  23 +
 .../api/table/codegen/CodeGenException.scala|  24 +
 .../flink/api/table/codegen/CodeGenUtils.scala  | 176 
 .../flink/api/table/codegen/CodeGenerator.scala | 752 ++
 .../table/codegen/ExpressionCodeGenerator.scala | 794 ---
 .../api/table/codegen/GenerateFilter.scala  |  99 ---
 .../flink/api/table/codegen/GenerateJoin.scala  | 171 
 .../table/codegen/GenerateResultAssembler.scala | 119 ---
 .../api/table/codegen/GenerateSelect.scala  |  84 --
 .../api/table/codegen/GeneratedExpression.scala |  27 +
 .../api/table/codegen/GeneratedFunction.scala   |  23 +
 .../flink/api/table/codegen/Indenter.scala  |  10 +-
 .../api/table/codegen/OperatorCodeGen.scala | 367 +
 .../flink/api/table/expressions/literals.scala  |   6 +-
 .../flink/api/table/plan/TypeConverter.scala|  83 +-
 .../plan/nodes/dataset/DataSetExchange.scala|  11 +-
 .../plan/nodes/dataset/DataSetFlatMap.scala |  24 +-
 .../plan/nodes/dataset/DataSetGroupReduce.scala |   7 +-
 .../table/plan/nodes/dataset/DataSetJoin.scala  |   7 +-
 .../table/plan/nodes/dataset/DataSetMap.scala   |  26 +-
 .../plan/nodes/dataset/DataSetReduce.scala  |   7 +-
 .../table/plan/nodes/dataset/DataSetRel.scala   |   8 +-
 .../table/plan/nodes/dataset/DataSetSort.scala  |  11 +-
 .../plan/nodes/dataset/DataSetSource.scala  | 190 ++---
 .../table/plan/nodes/dataset/DataSetUnion.scala |   7 +-
 .../plan/rules/dataset/DataSetFilterRule.scala  |  50 +-
 .../plan/rules/dataset/DataSetProjectRule.scala |  38 +-
 .../runtime/ExpressionAggregateFunction.scala   | 100 ---
 .../runtime/ExpressionFilterFunction.scala  |  50 --
 .../table/runtime/ExpressionJoinFunction.scala  |  57 --
 .../runtime/ExpressionSelectFunction.scala  |  56 --
 .../flink/api/table/runtime/FlatMapRunner.scala |  51 ++
 .../api/table/runtime/FunctionCompiler.scala|  35 +
 .../flink/api/table/runtime/MapRunner.scala |  50 ++
 .../flink/api/table/runtime/package.scala   |  23 -
 .../api/table/typeinfo/RenameOperator.scala |  36 -
 .../table/typeinfo/RenamingProxyTypeInfo.scala  | 143 
 .../flink/api/table/typeinfo/RowTypeInfo.scala  |  23 +-
 .../api/java/table/test/AggregationsITCase.java |   1 -
 .../flink/api/java/table/test/AsITCase.java |   4 +-
 .../api/java/table/test/CastingITCase.java  |  13 +-
 .../api/java/table/test/ExpressionsITCase.java  |  20 +-
 .../flink/api/java/table/test/FilterITCase.java |  26 +-
 .../table/test/GroupedAggregationsITCase.java   |   1 -
 .../flink/api/java/table/test/JoinITCase.java   |   1 -
 .../api/java/table/test/PojoGroupingITCase.java |   4 +-
 .../flink/api/java/table/test/SelectITCase.java |  22 +-
 .../table/test/StringExpressionsITCase.java |   7 +-
 .../flink/api/java/table/test/UnionITCase.java  |   1 -
 .../api/scala/table/test/CastingITCase.scala|  11 +-
 .../scala/table/test/ExpressionsITCase.scala|  33 +-
 .../api/scala/table/test/FilterITCase.scala |  59 +-
 .../api/scala/table/test/SelectITCase.scala |  27 +-
 .../table/test/StringExpressionsITCase.scala|   6 +-
 .../api/table/test/TableProgramsTestBase.scala  |  97 +++
 .../typeinfo/RenamingProxyTypeInfoTest.scala|  75 --
 .../api/table/typeinfo/RowComparatorTest.scala  |   3 +-
 .../api/table/typeinfo/RowSerializerTest.scala  |  20 +-
 64 files changed, 2199 insertions(+), 2146 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scal

[12/50] [abbrv] flink git commit: [FLINK-3366] Rename @Experimental annotation to @PublicEvolving

2016-02-12 Thread fhueske
http://git-wip-us.apache.org/repos/asf/flink/blob/572855da/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
--
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 736c41b..04c1980 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.{Experimental, Public}
+import org.apache.flink.annotation.{PublicEvolving, Public}
 import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, 
MapFunction, Partitioner}
 import org.apache.flink.api.common.io.OutputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -50,6 +50,7 @@ class DataStream[T](stream: JavaStream[T]) {
 
   /**
 * Returns the [[StreamExecutionEnvironment]] associated with the current 
[[DataStream]].
+ *
 * @return associated execution environment
 */
   def getExecutionEnvironment: StreamExecutionEnvironment =
@@ -60,7 +61,7 @@ class DataStream[T](stream: JavaStream[T]) {
*
* @return ID of the DataStream
*/
-  @Experimental
+  @PublicEvolving
   def getId = stream.getId
 
   /**
@@ -128,7 +129,7 @@ class DataStream[T](stream: JavaStream[T]) {
 * @param uid The unique user-specified ID of this transformation.
 * @return The operator with the specified ID.
 */
-  @Experimental
+  @PublicEvolving
   def uid(uid: String) : DataStream[T] = javaStream match {
 case stream : SingleOutputStreamOperator[T,_] => stream.uid(uid)
 case _ => throw new UnsupportedOperationException("Only supported for 
operators.")
@@ -142,7 +143,7 @@ class DataStream[T](stream: JavaStream[T]) {
* however it is not advised for performance considerations.
*
*/
-  @Experimental
+  @PublicEvolving
   def disableChaining(): DataStream[T] = {
 stream match {
   case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining();
@@ -158,7 +159,7 @@ class DataStream[T](stream: JavaStream[T]) {
* previous tasks even if possible.
*
*/
-  @Experimental
+  @PublicEvolving
   def startNewChain(): DataStream[T] = {
 stream match {
   case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain();
@@ -175,7 +176,7 @@ class DataStream[T](stream: JavaStream[T]) {
* All subsequent operators are assigned to the default resource group.
*
*/
-  @Experimental
+  @PublicEvolving
   def isolateResources(): DataStream[T] = {
 stream match {
   case ds: SingleOutputStreamOperator[_, _] => ds.isolateResources();
@@ -196,7 +197,7 @@ class DataStream[T](stream: JavaStream[T]) {
* degree of parallelism for the operators must be decreased from the
* default.
*/
-  @Experimental
+  @PublicEvolving
   def startNewResourceGroup(): DataStream[T] = {
 stream match {
   case ds: SingleOutputStreamOperator[_, _] => ds.startNewResourceGroup();
@@ -345,14 +346,14 @@ class DataStream[T](stream: JavaStream[T]) {
* the first instance of the next processing operator. Use this setting with 
care
* since it might cause a serious performance bottleneck in the application.
*/
-  @Experimental
+  @PublicEvolving
   def global: DataStream[T] = stream.global()
 
   /**
* Sets the partitioning of the DataStream so that the output tuples
* are shuffled to the next component.
*/
-  @Experimental
+  @PublicEvolving
   def shuffle: DataStream[T] = stream.shuffle()
 
   /**
@@ -385,7 +386,7 @@ class DataStream[T](stream: JavaStream[T]) {
* In cases where the different parallelisms are not multiples of each other 
one or several
* downstream operations will have a differing number of inputs from 
upstream operations.
*/
-  @Experimental
+  @PublicEvolving
   def rescale: DataStream[T] = stream.rescale()
 
   /**
@@ -408,7 +409,7 @@ class DataStream[T](stream: JavaStream[T]) {
* the keepPartitioning flag to true
*
*/
-  @Experimental
+  @PublicEvolving
   def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),
 maxWaitTimeMillis:Long = 0,
 keepPartitioning: Boolean = false) : DataStream[R] = {
@@ -438,7 +439,7 @@ class DataStream[T](stream: JavaStream[T]) {
* to 0 then the iteration sources will indefinitely, so the job must be 
killed to stop.
*
*/
-  @Experimental
+  @PublicEvolving
   def iterate[R, F: TypeInformation: ClassTag](stepFunction: 
ConnectedStreams[T, F] =>
 (DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = {
 val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]]
@@ -625,7 +626,7 @@ class Dat

[38/50] [abbrv] flink git commit: [FLINK-3225] Enforce translation to DataSetNodes

2016-02-12 Thread fhueske
[FLINK-3225] Enforce translation to DataSetNodes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3cb76fcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3cb76fcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3cb76fcb

Branch: refs/heads/tableOnCalcite
Commit: 3cb76fcbcc997e975ac7f1589c9f65d83dfd0137
Parents: fe5e406
Author: Fabian Hueske 
Authored: Mon Feb 1 23:45:16 2016 +0100
Committer: Fabian Hueske 
Committed: Fri Feb 12 11:34:09 2016 +0100

--
 .../flink/api/java/table/JavaBatchTranslator.scala  | 16 +---
 1 file changed, 9 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3cb76fcb/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index 66bfbe7..7e91190 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -19,14 +19,14 @@
 package org.apache.flink.api.java.table
 
 import org.apache.calcite.plan.{RelTraitSet, RelOptUtil}
-import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.{RelCollations, RelNode}
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.Programs
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{DataSet => JavaDataSet}
 import org.apache.flink.api.table.plan._
 import org.apache.flink.api.table.Table
-import org.apache.flink.api.table.plan.nodes.dataset.DataSetRel
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetRel}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
 import org.apache.flink.api.table.plan.schema.DataSetTable
 
@@ -61,20 +61,19 @@ class JavaBatchTranslator extends PlanTranslator {
 // get the planner for the plan
 val planner = lPlan.getCluster.getPlanner
 
-// we do not have any special requirements for the output
-val outputProps = RelTraitSet.createEmpty()
 
 println("---")
 println("Input Plan:")
 println("---")
 println(RelOptUtil.toString(lPlan))
-
+
 // decorrelate
 val decorPlan = RelDecorrelator.decorrelateQuery(lPlan)
 
 // optimize the logical Flink plan
 val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
-val optPlan = optProgram.run(planner, decorPlan, outputProps)
+val flinkOutputProps = RelTraitSet.createEmpty()
+val optPlan = optProgram.run(planner, decorPlan, flinkOutputProps)
 
 println("---")
 println("Optimized Plan:")
@@ -83,7 +82,10 @@ class JavaBatchTranslator extends PlanTranslator {
 
 // optimize the logical Flink plan
 val dataSetProgram = Programs.ofRules(FlinkRuleSets.DATASET_TRANS_RULES)
-val dataSetPlan = dataSetProgram.run(planner, optPlan, outputProps)
+val dataSetOutputProps = RelTraitSet.createEmpty()
+  .plus(DataSetConvention.INSTANCE)
+  .plus(RelCollations.of()).simplify()
+val dataSetPlan = dataSetProgram.run(planner, optPlan, dataSetOutputProps)
 
 println("-")
 println("DataSet Plan:")



[24/50] [abbrv] flink git commit: [FLINK-3107] [runtime] Start checkpoint ID counter with periodic scheduler

2016-02-12 Thread fhueske
[FLINK-3107] [runtime] Start checkpoint ID counter with periodic scheduler

Problem: The job manager enables checkpoints during submission of streaming
programs. This can lead to call to a call to 
`ZooKeeperCheckpointIDCounter.start()`,
which communicates with ZooKeeper. This can block the job manager actor.

Solution: Start the counter in the `CheckpointCoordinatorDeActivator`.

This closes #1610.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8df0bbac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8df0bbac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8df0bbac

Branch: refs/heads/tableOnCalcite
Commit: 8df0bbacb8712342471c12cbf765a0a92b70abc9
Parents: 6968a57
Author: Ufuk Celebi 
Authored: Tue Feb 9 16:06:46 2016 +0100
Committer: Ufuk Celebi 
Committed: Wed Feb 10 19:51:59 2016 +0100

--
 .../flink/runtime/checkpoint/CheckpointCoordinator.java | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8df0bbac/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9963a20..b0e23d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -197,9 +197,9 @@ public class CheckpointCoordinator {
this.completedCheckpointStore = 
checkNotNull(completedCheckpointStore);
this.recentPendingCheckpoints = new 
ArrayDeque(NUM_GHOST_CHECKPOINT_IDS);
this.userClassLoader = userClassLoader;
-   this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
 
-   checkpointIDCounter.start();
+   // Started with the periodic scheduler
+   this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
 
this.timer = new Timer("Checkpoint Timer", true);
 
@@ -862,6 +862,14 @@ public class CheckpointCoordinator {
// make sure all prior timers are cancelled
stopCheckpointScheduler();
 
+   try {
+   // Multiple start calls are OK
+   checkpointIdCounter.start();
+   } catch (Exception e) {
+   String msg = "Failed to start checkpoint ID 
counter: " + e.getMessage();
+   throw new RuntimeException(msg, e);
+   }
+
periodicScheduling = true;
currentPeriodicTrigger = new ScheduledTrigger();
timer.scheduleAtFixedRate(currentPeriodicTrigger, 
baseInterval, baseInterval);



[32/50] [abbrv] flink git commit: [FLINK-3358] [FLINK-3351] [rocksdb] Add simple constructors and automatic temp path configuration

2016-02-12 Thread fhueske
[FLINK-3358] [FLINK-3351] [rocksdb] Add simple constructors and automatic temp 
path configuration

This adds constructors that only take a backup dir URI and use it to initialize
both the RocksDB file backups and the FileSystem state backend for 
non-partitioned
state.

Also, the RocksDBStateBackend now automatically picks up the TaskManager's temp 
directories,
if no local storage directories are explicitly configured.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edae7934
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edae7934
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edae7934

Branch: refs/heads/tableOnCalcite
Commit: edae79340dd486915d25109cbdc1485accae665a
Parents: be72758
Author: Stephan Ewen 
Authored: Thu Feb 11 21:30:36 2016 +0100
Committer: Stephan Ewen 
Committed: Thu Feb 11 21:34:03 2016 +0100

--
 .../streaming/state/RocksDBStateBackend.java| 241 ++--
 .../state/RocksDBStateBackendConfigTest.java| 280 +++
 .../state/RocksDBStateBackendTest.java  |   4 +-
 .../state/filesystem/FsStateBackend.java|  99 ---
 .../EventTimeWindowCheckpointingITCase.java |  13 +-
 5 files changed, 566 insertions(+), 71 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/edae7934/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index eddd8c0..5b16e86 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -18,7 +18,12 @@
 package org.apache.flink.contrib.streaming.state;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
@@ -28,13 +33,17 @@ import 
org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.api.common.state.StateBackend;
 
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.rocksdb.Options;
 import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
@@ -54,20 +63,35 @@ import static java.util.Objects.requireNonNull;
 public class RocksDBStateBackend extends AbstractStateBackend {
private static final long serialVersionUID = 1L;
 
-   /** Base path for RocksDB directory. */
-   private final String dbBasePath;
-
-   /** The checkpoint directory that we snapshot RocksDB backups to. */
-   private final String checkpointDirectory;
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBStateBackend.class);
+   
+   
+   /** The checkpoint directory that copy the RocksDB backups to. */
+   private final Path checkpointDirectory;
 
+   /** The state backend that stores the non-partitioned state */
+   private final AbstractStateBackend nonPartitionedStateBackend;
+   
+   
/** Operator identifier that is used to uniqueify the RocksDB storage 
path. */
private String operatorIdentifier;
 
/** JobID for uniquifying backup paths. */
private JobID jobId;
+   
 
-   private AbstractStateBackend backingStateBackend;
+   // DB storage directories
+   
+   /** Base paths for RocksDB directory, as configured. May be null. */
+   private Path[] dbBasePaths;
 
+   /** Base paths for RocksDB directory, as initialized */
+   private File[] dbStorageDirectories;
+   
+   private int nextDirectory;
+   
+   // RocksDB options
+   
/** The pre-configured option settings */
private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;

@@ -79,31 +103,112 @@ public class RocksDBS

[26/50] [abbrv] flink git commit: [FLINK-3384] [kafka] Add ClosableQueue for message exchanges between Kafka Threads

2016-02-12 Thread fhueske
[FLINK-3384] [kafka] Add ClosableQueue for message exchanges between Kafka 
Threads


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd324ea7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd324ea7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd324ea7

Branch: refs/heads/tableOnCalcite
Commit: fd324ea72979cc3d4202ffa3ea174ec4cc9d153b
Parents: 50bd65a
Author: Stephan Ewen 
Authored: Wed Feb 10 14:51:10 2016 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 10 22:15:32 2016 +0100

--
 .../kafka/internals/ClosableBlockingQueue.java  | 502 +++
 .../internals/ClosableBlockingQueueTest.java| 603 +++
 2 files changed, 1105 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fd324ea7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
new file mode 100644
index 000..856c2ad
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A special form of blocking queue with two additions:
+ * 
+ * The queue can be closed atomically when empty. Adding elements 
after the queue
+ * is closed fails. This allows queue consumers to atomically discover 
that no elements
+ * are available and mark themselves as shut down.
+ * The queue allows to poll batches of elements in one polling 
call.
+ * 
+ * 
+ * The queue has no capacity restriction and is safe for multiple producers 
and consumers.
+ * 
+ * Note: Null elements are prohibited.
+ * 
+ * @param  The type of elements in the queue.
+ */
+public class ClosableBlockingQueue {
+
+   /** The lock used to make queue accesses and open checks atomic */
+   private final ReentrantLock lock;
+   
+   /** The condition on which blocking get-calls wait if the queue is 
empty */
+   private final Condition nonEmpty;
+   
+   /** The deque of elements */
+   private final ArrayDeque elements;
+   
+   /** Flag marking the status of the queue */
+   private volatile boolean open;
+   
+   // 

+
+   /**
+* Creates a new empty queue.
+*/
+   public ClosableBlockingQueue() {
+   this(10);
+   }
+
+   /**
+* Creates a new empty queue, reserving space for at least the 
specified number
+* of elements. The queu can still grow, of more elements are added 
than the
+* reserved space.
+* 
+* @param initialSize The number of elements to reserve space for.
+*/
+   public ClosableBlockingQueue(int initialSize) {
+   this.lock = new ReentrantLock(true);
+   this.nonEmpty = this.lock.newCondition();
+   
+   this.elements = new ArrayDeque<>(initialSize);
+   this.open = true;
+   
+   
+   }
+
+   /**
+* Creates a new queue that contains the given eleme

[04/50] [abbrv] flink git commit: [hotfix] Update comments in 'ChainingStrategy' and remove outdated 'FORCE_ALWAYS' constant

2016-02-12 Thread fhueske
[hotfix] Update comments in 'ChainingStrategy' and remove outdated 
'FORCE_ALWAYS' constant


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9c83ea2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9c83ea2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9c83ea2

Branch: refs/heads/tableOnCalcite
Commit: e9c83ea2c36decf02d0ea9c2b76b0fd50606b51b
Parents: 28feede
Author: Stephan Ewen 
Authored: Mon Feb 8 15:08:09 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 8 20:19:25 2016 +0100

--
 .../api/graph/StreamingJobGraphGenerator.java   |  9 ++
 .../api/operators/ChainingStrategy.java | 30 +++-
 2 files changed, 19 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e9c83ea2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index fd75ba7..c0d2856 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -398,15 +398,12 @@ public class StreamingJobGraphGenerator {
&& headOperator != null
&& upStreamVertex.getSlotSharingID() == 
downStreamVertex.getSlotSharingID()
&& upStreamVertex.getSlotSharingID() != -1
-   && (outOperator.getChainingStrategy() == 
ChainingStrategy.ALWAYS ||
-   outOperator.getChainingStrategy() == 
ChainingStrategy.FORCE_ALWAYS)
+   && outOperator.getChainingStrategy() == 
ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == 
ChainingStrategy.HEAD ||
-   headOperator.getChainingStrategy() == 
ChainingStrategy.ALWAYS ||
-   headOperator.getChainingStrategy() == 
ChainingStrategy.FORCE_ALWAYS)
+   headOperator.getChainingStrategy() == 
ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof 
ForwardPartitioner)
&& upStreamVertex.getParallelism() == 
downStreamVertex.getParallelism()
-   && (streamGraph.isChainingEnabled() ||
-   outOperator.getChainingStrategy() == 
ChainingStrategy.FORCE_ALWAYS);
+   && streamGraph.isChainingEnabled();
}
 
private void setSlotSharing() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e9c83ea2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
index 18e8858..1bf3259 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java
@@ -18,23 +18,22 @@
 
 package org.apache.flink.streaming.api.operators;
 
-
 /**
- * Defines the chaining scheme for the operator.
- * By default {@link #ALWAYS} is used, which means operators will be eagerly 
chained whenever possible.
+ * Defines the chaining scheme for the operator. When an operator is chained 
to the
+ * predecessor, it means that they run in the same thread. They become one 
operator
+ * consisting of multiple steps.
+ * 
+ * The default value used by the {@link StreamOperator} is {@link #HEAD}, 
which means that
+ * the operator is not chained to its predecessor. Most operators override 
this with
+ * {@link #ALWAYS}, meaning they will be chained to predecessors whenever 
possible.
  */
 public enum ChainingStrategy {
 
-   /**
-* Chaining will happen even if chaining is disabled on the execution 
environment.
-* This should only be used by system-level operators, not operators 
implemented by users.
-*/
-   FORCE_ALWAYS,
-
/** 
-* Operators will be eagerly chained whenever possible, for
-* maximal performance. It is generally a good practice to allow maximal
-  

[10/50] [abbrv] flink git commit: [FLINK-3373] [build] Revert HTTP Components versions because of incompatibility with Hadoop >= 2.6.0

2016-02-12 Thread fhueske
[FLINK-3373] [build] Revert HTTP Components versions because of incompatibility 
with Hadoop >= 2.6.0


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de231d74
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de231d74
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de231d74

Branch: refs/heads/tableOnCalcite
Commit: de231d74650600ae84a76ee267aeab6ddd0ff596
Parents: 72f8228
Author: Stephan Ewen 
Authored: Tue Feb 9 20:38:43 2016 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 9 20:38:43 2016 +0100

--
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/de231d74/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 81528c8..0a64c4a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -344,13 +344,13 @@ under the License.

org.apache.httpcomponents
httpcore
-   4.4.4
+   4.2.5

 

org.apache.httpcomponents
httpclient
-   4.5.1
+   4.2.6

 




[30/50] [abbrv] flink git commit: [FLINK-3271] [build] Don't exclude jetty-util dependency from the Hadoop dependencies

2016-02-12 Thread fhueske
[FLINK-3271] [build] Don't exclude jetty-util dependency from the Hadoop 
dependencies

This closes #1543


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82c7383a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82c7383a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82c7383a

Branch: refs/heads/tableOnCalcite
Commit: 82c7383a67a8349d05dc98780667b3a47ab3cc54
Parents: dcea46e
Author: Abhishek Agarwal 
Authored: Sun Jan 24 20:22:45 2016 +0530
Committer: Stephan Ewen 
Committed: Thu Feb 11 21:34:03 2016 +0100

--
 .../flink-shaded-hadoop1/pom.xml| 12 --
 .../flink-shaded-hadoop2/pom.xml| 41 +---
 2 files changed, 1 insertion(+), 52 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/82c7383a/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml
--
diff --git a/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml 
b/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml
index 15082aa..e8634f5 100644
--- a/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml
+++ b/flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml
@@ -69,10 +69,6 @@ under the License.
jsp-2.1


-   org.mortbay.jetty
-   jetty-util
-   
-   
org.eclipse.jdt
core

@@ -81,14 +77,6 @@ under the License.
servlet-api


-   org.mortbay.jetty
-   jetty
-   
-   
-   org.mortbay.jetty
-   jetty-util
-   
-   
com.sun.jersey
jersey-core


http://git-wip-us.apache.org/repos/asf/flink/blob/82c7383a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
--
diff --git a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml 
b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
index 5eb8043..c642653 100644
--- a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
+++ b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
@@ -68,10 +68,7 @@ under the License.
org.mortbay.jetty
jsp-2.1

-   
-   org.mortbay.jetty
-   jetty-util
-   
+

org.eclipse.jdt
core
@@ -81,10 +78,6 @@ under the License.
jetty


-   org.mortbay.jetty
-   jetty-util
-   
-   
com.sun.jersey
jersey-json

@@ -185,10 +178,6 @@ under the License.
jsp-2.1


-   org.mortbay.jetty
-   jetty-util
-   
-   
org.eclipse.jdt
core

@@ -197,10 +186,6 @@ under the License.
jetty


-   org.mortbay.jetty
-   jetty-util
-   
-   
com.sun.jersey
jersey-json

@@ -301,10 +286,6 @@ under the License.
jsp-2.1


-   org.mortbay.jetty
-  

[22/50] [abbrv] flink git commit: [hotfix] [tests] Reset state to allow retry on failure

2016-02-12 Thread fhueske
[hotfix] [tests] Reset state to allow retry on failure

This closes #1611


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48b74546
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48b74546
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48b74546

Branch: refs/heads/tableOnCalcite
Commit: 48b745466202ebbb68608930e13cb6ed4a35e6e7
Parents: 756cbaf
Author: Ufuk Celebi 
Authored: Tue Feb 9 12:45:41 2016 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 10 15:27:41 2016 +0100

--
 .../JobManagerCheckpointRecoveryITCase.java  | 19 ---
 1 file changed, 12 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/48b74546/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
index 59a05ff..ea30c58 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerCheckpointRecoveryITCase.java
@@ -116,15 +116,15 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
 
private static final int Parallelism = 8;
 
-   private static final CountDownLatch CompletedCheckpointsLatch = new 
CountDownLatch(2);
+   private static CountDownLatch CompletedCheckpointsLatch = new 
CountDownLatch(2);
 
-   private static final AtomicLongArray RecoveredStates = new 
AtomicLongArray(Parallelism);
+   private static AtomicLongArray RecoveredStates = new 
AtomicLongArray(Parallelism);
 
-   private static final CountDownLatch FinalCountLatch = new 
CountDownLatch(1);
+   private static CountDownLatch FinalCountLatch = new CountDownLatch(1);
 
-   private static final AtomicReference FinalCount = new 
AtomicReference<>();
+   private static AtomicReference FinalCount = new 
AtomicReference<>();
 
-   private static final long LastElement = -1;
+   private static long LastElement = -1;
 
/**
 * Simple checkpointed streaming sum.
@@ -156,7 +156,6 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
Configuration config = 
ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper
.getConnectString(), 
FileStateBackendBasePath.getAbsoluteFile().toURI().toString());
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
Parallelism);
-   config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 
ActorSystem testSystem = null;
JobManagerProcess[] jobManagerProcess = new 
JobManagerProcess[2];
@@ -248,6 +247,13 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
}
}
catch (Throwable t) {
+   // Reset all static state for test retries
+   CompletedCheckpointsLatch = new CountDownLatch(2);
+   RecoveredStates = new AtomicLongArray(Parallelism);
+   FinalCountLatch = new CountDownLatch(1);
+   FinalCount = new AtomicReference<>();
+   LastElement = -1;
+
// Print early (in some situations the process logs get 
too big
// for Travis and the root problem is not shown)
t.printStackTrace();
@@ -303,7 +309,6 @@ public class JobManagerCheckpointRecoveryITCase extends 
TestLogger {
fileStateBackendPath);
 
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);
-   config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 
JobManagerProcess[] jobManagerProcess = new 
JobManagerProcess[2];
LeaderRetrievalService leaderRetrievalService = null;



[41/50] [abbrv] flink git commit: [FLINK-3225] Implemented optimization of Table API queries via Calcite

2016-02-12 Thread fhueske
http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala
deleted file mode 100644
index 65b97fb..000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operators/DataSetTable.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.operators
-
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.plan.TypeConverter
-
-class DataSetTable[T](
-  val dataSet: DataSet[T],
-  val fieldNames: Array[String]) extends AbstractTable {
-
-  // check uniquenss of field names
-  if (fieldNames.length != fieldNames.toSet.size) {
-throw new scala.IllegalArgumentException(
-  "Table field names must be unique.")
-  }
-
-  val dataSetType: CompositeType[T] =
-dataSet.getType match {
-  case cType: CompositeType[T] =>
-cType
-  case _ =>
-throw new scala.IllegalArgumentException(
-  "DataSet must have a composite type.")
-}
-
-  val fieldTypes: Array[SqlTypeName] =
-if (fieldNames.length == dataSetType.getArity) {
-  (0 until dataSetType.getArity)
-.map(i => dataSetType.getTypeAt(i))
-.map(TypeConverter.typeInfoToSqlType)
-.toArray
-}
-else {
-  throw new IllegalArgumentException(
-"Arity of DataSet type not equal to number of field names.")
-}
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
-val builder = typeFactory.builder
-fieldNames.zip(fieldTypes)
-  .foreach( f => builder.add(f._1, f._2).nullable(true) )
-builder.build
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5e4065/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
new file mode 100644
index 000..97e8b32
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules
+
+import org.apache.calcite.rel.rules._
+import org.apache.calcite.tools.{RuleSets, RuleSet}
+import org.apache.flink.api.table.plan.rules.logical._
+import org.apache.flink.api.table.plan.rules.dataset._
+
+object FlinkRuleSets {
+
+  /**
+* RuleSet to optimize plans for batch / DataSet exeuction
+*/
+  val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
+
+// filter rules
+FilterJoinRule.FILTER_ON_JOIN,
+FilterJoinRule.JOIN,
+Fil

[48/50] [abbrv] flink git commit: [FLINK-3282] add FlinkRelNode interface.

2016-02-12 Thread fhueske
[FLINK-3282] add FlinkRelNode interface.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20235e0a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20235e0a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20235e0a

Branch: refs/heads/tableOnCalcite
Commit: 20235e0afaf5de799e71094d72ae7e47337ea82d
Parents: 4abca1d
Author: chengxiang li 
Authored: Wed Jan 27 11:37:43 2016 +0800
Committer: Fabian Hueske 
Committed: Fri Feb 12 11:34:09 2016 +0100

--
 .../api/table/sql/calcite/DataSetRelNode.java   | 29 +++
 .../table/sql/calcite/node/DataSetExchange.java | 60 +++
 .../table/sql/calcite/node/DataSetFlatMap.java  | 56 ++
 .../api/table/sql/calcite/node/DataSetJoin.java | 80 
 .../api/table/sql/calcite/node/DataSetMap.java  | 58 ++
 .../table/sql/calcite/node/DataSetReduce.java   | 58 ++
 .../sql/calcite/node/DataSetReduceGroup.java| 62 +++
 .../api/table/sql/calcite/node/DataSetSort.java | 59 +++
 .../table/sql/calcite/node/DataSetSource.java   | 55 ++
 .../table/sql/calcite/node/DataSetUnion.java| 51 +
 10 files changed, 568 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/20235e0a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java
--
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java
new file mode 100644
index 000..df0ebc0
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/DataSetRelNode.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.sql.calcite;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.flink.api.java.DataSet;
+
+public interface DataSetRelNode extends RelNode {
+   
+   /**
+* Translate the FlinkRelNode into Flink operator.
+*/
+   DataSet translateToPlan();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/20235e0a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java
--
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java
new file mode 100644
index 000..1ddd884
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/sql/calcite/node/DataSetExchange.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.sql.calcite.node;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
+import org.apa

[05/50] [abbrv] flink git commit: [hotfix] Cleanup routing of records in OperatorChain

2016-02-12 Thread fhueske
[hotfix] Cleanup routing of records in OperatorChain


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28c6254e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28c6254e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28c6254e

Branch: refs/heads/tableOnCalcite
Commit: 28c6254ee385fe746e868a81b2207bf66b552174
Parents: e9c83ea
Author: Stephan Ewen 
Authored: Mon Feb 8 16:14:00 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 8 20:36:35 2016 +0100

--
 .../BroadcastOutputSelectorWrapper.java |  45 ---
 .../api/collector/selector/DirectedOutput.java  | 130 +++
 .../selector/DirectedOutputSelectorWrapper.java |  97 --
 .../selector/OutputSelectorWrapper.java |   9 +-
 .../selector/OutputSelectorWrapperFactory.java  |  33 -
 .../flink/streaming/api/graph/StreamConfig.java |  20 +--
 .../flink/streaming/api/graph/StreamNode.java   |  10 +-
 .../api/graph/StreamingJobGraphGenerator.java   |   2 +-
 .../streaming/runtime/io/CollectorWrapper.java  |  61 -
 .../streaming/runtime/tasks/OperatorChain.java  |  84 ++--
 .../flink/streaming/api/OutputSplitterTest.java |   2 +-
 .../runtime/tasks/StreamTaskTestHarness.java|   9 +-
 12 files changed, 225 insertions(+), 277 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
deleted file mode 100644
index 7034b11..000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-public class BroadcastOutputSelectorWrapper implements 
OutputSelectorWrapper {
-
-   private static final long serialVersionUID = 1L;
-   
-   private final ArrayList>> outputs;
-
-   public BroadcastOutputSelectorWrapper() {
-   outputs = new ArrayList>>();
-   }
-   
-   @Override
-   public void addCollector(Collector> output, 
StreamEdge edge) {
-   outputs.add(output);
-   }
-
-   @Override
-   public Iterable>> getSelectedOutputs(OUT 
record) {
-   return outputs;
-   }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
new file mode 100644
index 000..52c50b3
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ 

[02/50] [abbrv] flink git commit: [hotfix] typo IllegalArgumentException

2016-02-12 Thread fhueske
[hotfix] typo IllegalArgumentException

This closes #1602


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c47f385
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c47f385
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c47f385

Branch: refs/heads/tableOnCalcite
Commit: 5c47f3854938fc789ac0fd0867451f4def155c90
Parents: d51bec1
Author: Andrea Sella 
Authored: Mon Feb 8 16:38:38 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 8 20:18:19 2016 +0100

--
 .../java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5c47f385/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
--
diff --git 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index 84eb309..132edc4 100644
--- 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -357,10 +357,10 @@ public class JDBCInputFormat extends 
RichInputFormat

[49/50] [abbrv] flink git commit: [FLINK-3226] Add DataSet scan and conversion to DataSet[Row]

2016-02-12 Thread fhueske
[FLINK-3226] Add DataSet scan and  conversion to DataSet[Row]

This closes #1579.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99f60c84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99f60c84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99f60c84

Branch: refs/heads/tableOnCalcite
Commit: 99f60c84092a5c656591d68f4d4450f14f88b9ba
Parents: 3cb76fc
Author: Fabian Hueske 
Authored: Tue Feb 2 17:15:28 2016 +0100
Committer: Fabian Hueske 
Committed: Fri Feb 12 11:34:09 2016 +0100

--
 .../api/java/table/JavaBatchTranslator.scala|   2 +
 .../api/scala/table/ScalaBatchTranslator.scala  |   8 +-
 .../flink/api/table/plan/PlanTranslator.scala   |  87 ---
 .../flink/api/table/plan/TypeConverter.scala|  15 ++
 .../plan/nodes/dataset/DataSetSource.scala  | 150 ++-
 .../plan/rules/dataset/DataSetScanRule.scala|   8 +-
 .../api/table/plan/schema/DataSetTable.scala|  41 ++---
 .../flink/api/java/table/test/AsITCase.java |  63 ++--
 .../flink/api/java/table/test/FilterITCase.java |  15 +-
 .../flink/api/java/table/test/SelectITCase.java |  15 +-
 .../flink/api/scala/table/test/AsITCase.scala   |  40 -
 .../api/scala/table/test/SelectITCase.scala |  28 ++--
 12 files changed, 380 insertions(+), 92 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index 7e91190..f70f477 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -40,11 +40,13 @@ class JavaBatchTranslator extends PlanTranslator {
 
   override def createTable[A](
   repr: Representation[A],
+  fieldIndexes: Array[Int],
   fieldNames: Array[String]): Table = {
 
 // create table representation from DataSet
 val dataSetTable = new DataSetTable[A](
   repr.asInstanceOf[JavaDataSet[A]],
+  fieldIndexes,
   fieldNames
 )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
index 1c453fa..cc92c37 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
@@ -38,8 +38,12 @@ class ScalaBatchTranslator extends PlanTranslator {
 
   type Representation[A] = DataSet[A]
 
-  override def createTable[A](repr: Representation[A], fieldNames: 
Array[String]): Table = {
-javaTranslator.createTable(repr.javaSet, fieldNames)
+  override def createTable[A](
+repr: Representation[A],
+fieldIndexes: Array[Int],
+fieldNames: Array[String]): Table =
+  {
+javaTranslator.createTable(repr.javaSet, fieldIndexes, fieldNames)
   }
 
   override def translate[O](op: RelNode)(implicit tpe: TypeInformation[O]): 
DataSet[O] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/99f60c84/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
index 4e97f83..af22768 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
@@ -18,10 +18,12 @@
 package org.apache.flink.api.table.plan
 
 import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo

[17/50] [abbrv] flink git commit: [FLINK-3373] [build] Shade away Hadoop's HTTP Components dependency

2016-02-12 Thread fhueske
[FLINK-3373] [build] Shade away Hadoop's HTTP Components dependency

This closes #1615


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ccd7544
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ccd7544
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ccd7544

Branch: refs/heads/tableOnCalcite
Commit: 8ccd7544edb25e82cc8a898809cc7c8bb7893620
Parents: aeee6ef
Author: Stephan Ewen 
Authored: Tue Feb 9 21:18:43 2016 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 10 15:01:22 2016 +0100

--
 flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml | 15 +++
 flink-shaded-hadoop/pom.xml  | 13 +
 pom.xml  | 12 
 3 files changed, 28 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8ccd7544/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
--
diff --git a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml 
b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
index b5839d9..5eb8043 100644
--- a/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
+++ b/flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml
@@ -652,4 +652,19 @@ under the License.


 
+   
+   
+   
+   org.apache.httpcomponents
+   httpcore
+   4.2.5
+   
+   
+   
+   org.apache.httpcomponents
+   httpclient
+   4.2.6
+   
+   
+   
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ccd7544/flink-shaded-hadoop/pom.xml
--
diff --git a/flink-shaded-hadoop/pom.xml b/flink-shaded-hadoop/pom.xml
index 7d54ef9..d5a8529 100644
--- a/flink-shaded-hadoop/pom.xml
+++ b/flink-shaded-hadoop/pom.xml
@@ -111,6 +111,11 @@ under the License.

io.netty:netty:*

org.apache.curator:*

org.apache.hadoop:*
+
+   
+   
net.java.dev.jets3t:jets3t
+   
org.apache.httpcomponents:*
+   
commons-httpclient:commons-httpclient



@@ -133,6 +138,14 @@ under the License.

org.apache.curator

org.apache.flink.hadoop.shaded.org.apache.curator

+   
+   
org.apache.http
+   
org.apache.flink.hadoop.shaded.org.apache.http
+   
+   
+   
org.apache.commons.httpclient
+   
org.apache.flink.hadoop.shaded.org.apache.commons.httpclient
+   




http://git-wip-us.apache.org/repos/asf/flink/blob/8ccd7544/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 0a64c4a..42ebf79 100644
--- a/pom.xml
+++ b/pom.xml
@@ -342,18 +342,6 @@ under the License.

 

-   org.apache.httpcomponents
-   httpcore
-   4.2.5
-   
-
-   
-   org.apache.httpcomponents
-   httpclient
-   4.2.6
-   
-
- 

[47/50] [abbrv] flink git commit: [FLINK-3226] implement GroupReduce translation; enable tests for supported operations

2016-02-12 Thread fhueske
[FLINK-3226] implement GroupReduce translation; enable tests for supported 
operations

Squashes the following commits:
- Compute average as sum and count for byte, short and int type to avoid 
rounding errors
- Move aggregation functions to org.apache.flink.table.runtime
- Remove join-related changes
- Change integer average aggregations to maintain sum and count
- Long average uses a BigInteger sum


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/18e7f2f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/18e7f2f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/18e7f2f4

Branch: refs/heads/tableOnCalcite
Commit: 18e7f2f4e0654ad2f00dcf7d1351846345accb13
Parents: 6dd2d77
Author: vasia 
Authored: Thu Feb 4 15:53:52 2016 +0100
Committer: Fabian Hueske 
Committed: Fri Feb 12 11:34:09 2016 +0100

--
 .../flink/api/table/plan/TypeConverter.scala|   2 +-
 .../plan/functions/AggregateFunction.scala  |  71 -
 .../table/plan/functions/FunctionUtils.scala|  37 -
 .../plan/functions/aggregate/Aggregate.scala|  42 --
 .../functions/aggregate/AggregateFactory.scala  | 135 -
 .../plan/functions/aggregate/AvgAggregate.scala | 148 ---
 .../functions/aggregate/CountAggregate.scala|  34 -
 .../plan/functions/aggregate/MaxAggregate.scala | 136 -
 .../plan/functions/aggregate/MinAggregate.scala | 136 -
 .../plan/functions/aggregate/SumAggregate.scala | 130 
 .../plan/nodes/dataset/DataSetGroupReduce.scala |  30 +++-
 .../table/plan/nodes/dataset/DataSetJoin.scala  |   6 +-
 .../plan/nodes/logical/FlinkAggregate.scala |  16 --
 .../api/table/plan/rules/FlinkRuleSets.scala|   3 +-
 .../rules/dataset/DataSetAggregateRule.scala|  13 +-
 .../plan/rules/dataset/DataSetJoinRule.scala| 102 +
 .../api/table/runtime/AggregateFunction.scala   |  76 ++
 .../api/table/runtime/aggregate/Aggregate.scala |  42 ++
 .../runtime/aggregate/AggregateFactory.scala| 136 +
 .../table/runtime/aggregate/AvgAggregate.scala  | 131 
 .../runtime/aggregate/CountAggregate.scala  |  34 +
 .../table/runtime/aggregate/MaxAggregate.scala  |  84 +++
 .../table/runtime/aggregate/MinAggregate.scala  |  86 +++
 .../table/runtime/aggregate/SumAggregate.scala  |  50 +++
 .../api/java/table/test/AggregationsITCase.java |  13 +-
 .../api/java/table/test/ExpressionsITCase.java  |   2 -
 .../flink/api/java/table/test/FilterITCase.java |   2 -
 .../table/test/GroupedAggregationsITCase.java   |   6 +-
 .../flink/api/java/table/test/SelectITCase.java |   2 -
 .../flink/api/java/table/test/UnionITCase.java  |   1 -
 .../scala/table/test/AggregationsITCase.scala   |  11 +-
 .../scala/table/test/ExpressionsITCase.scala|   1 -
 .../table/test/GroupedAggregationsITCase.scala  |   6 +-
 33 files changed, 704 insertions(+), 1020 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
index b7cb200..1fc482a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
@@ -135,7 +135,7 @@ object TypeConverter {
 logicalFieldTypes.head
   }
   else {
-new TupleTypeInfo[Any](logicalFieldTypes.toArray:_*)
+new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*)
   }
 }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/18e7f2f4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
deleted file mode 100644
index 4abf2d2..000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright o

[29/50] [abbrv] flink git commit: [FLINK-3364] [runtime, yarn] Move SavepointStore initialization out of JobManager constructor

2016-02-12 Thread fhueske
[FLINK-3364] [runtime, yarn] Move SavepointStore initialization out of 
JobManager constructor

This closes #1622.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcea46e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcea46e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcea46e8

Branch: refs/heads/tableOnCalcite
Commit: dcea46e891a1479205fdfe939858d340cde87d57
Parents: ed7d3da
Author: Ufuk Celebi 
Authored: Thu Feb 11 14:58:35 2016 +0100
Committer: Ufuk Celebi 
Committed: Thu Feb 11 20:49:28 2016 +0100

--
 .../flink/runtime/jobmanager/JobManager.scala   | 22 +++-
 .../JobManagerLeaderElectionTest.java   |  6 +-
 .../runtime/testingUtils/TestingCluster.scala   |  6 --
 .../testingUtils/TestingJobManager.scala|  8 ---
 .../runtime/testingUtils/TestingUtils.scala |  3 ++-
 .../flink/yarn/TestingYarnJobManager.scala  |  8 ---
 .../org/apache/flink/yarn/YarnJobManager.scala  |  8 ---
 7 files changed, 38 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
--
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d96575f..78612c0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -113,7 +113,8 @@ class JobManager(
 protected val timeout: FiniteDuration,
 protected val leaderElectionService: LeaderElectionService,
 protected val submittedJobGraphs : SubmittedJobGraphStore,
-protected val checkpointRecoveryFactory : CheckpointRecoveryFactory)
+protected val checkpointRecoveryFactory : CheckpointRecoveryFactory,
+protected val savepointStore: SavepointStore)
   extends FlinkActor
   with LeaderSessionMessageFilter // mixin oder is important, we want 
filtering after logging
   with LogMessages // mixin order is important, we want first logging
@@ -151,9 +152,6 @@ class JobManager(
   val webMonitorPort : Int = flinkConfiguration.getInteger(
 ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
 
-  protected val savepointStore : SavepointStore =
-SavepointStoreFactory.createFromConfig(flinkConfiguration)
-
   /**
* Run when the job manager is started. Simply logs an informational message.
* The method also starts the leader election service.
@@ -2040,7 +2038,8 @@ object JobManager {
 Int, // number of archived jobs
 LeaderElectionService,
 SubmittedJobGraphStore,
-CheckpointRecoveryFactory) = {
+CheckpointRecoveryFactory,
+SavepointStore) = {
 
 val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
@@ -2078,8 +2077,6 @@ object JobManager {
   }
   }
 
-
-
 var blobServer: BlobServer = null
 var instanceManager: InstanceManager = null
 var scheduler: FlinkScheduler = null
@@ -2140,6 +2137,8 @@ object JobManager {
 new ZooKeeperCheckpointRecoveryFactory(client, configuration))
   }
 
+val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
+
 (executorService,
   instanceManager,
   scheduler,
@@ -2150,7 +2149,8 @@ object JobManager {
   archiveCount,
   leaderElectionService,
   submittedJobGraphs,
-  checkpointRecoveryFactory)
+  checkpointRecoveryFactory,
+  savepointStore)
   }
 
   /**
@@ -2212,7 +2212,8 @@ object JobManager {
 archiveCount,
 leaderElectionService,
 submittedJobGraphs,
-checkpointRecoveryFactory) = createJobManagerComponents(
+checkpointRecoveryFactory,
+savepointStore) = createJobManagerComponents(
   configuration,
   None)
 
@@ -2237,7 +2238,8 @@ object JobManager {
   timeout,
   leaderElectionService,
   submittedJobGraphs,
-  checkpointRecoveryFactory)
+  checkpointRecoveryFactory,
+  savepointStore)
 
 val jobManager: ActorRef = jobMangerActorName match {
   case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/dcea46e8/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index f50a0a0..73c7646 100644
--- 

[21/50] [abbrv] flink git commit: [hotfix] [tests] Log retry rule failures on warn level

2016-02-12 Thread fhueske
[hotfix] [tests] Log retry rule failures on warn level


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/756cbaff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/756cbaff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/756cbaff

Branch: refs/heads/tableOnCalcite
Commit: 756cbafff1fd25f67268ca84b62c8a479156bf88
Parents: 3a643c0
Author: Ufuk Celebi 
Authored: Tue Feb 9 11:25:37 2016 +0100
Committer: Stephan Ewen 
Committed: Wed Feb 10 15:27:28 2016 +0100

--
 .../test/java/org/apache/flink/testutils/junit/RetryRule.java| 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/756cbaff/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
--
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java 
b/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
index a4aff86..2b3a37a 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/junit/RetryRule.java
@@ -113,7 +113,7 @@ public class RetryRule implements TestRule {
break; // success
}
catch (Throwable t) {
-   LOG.debug(String.format("Test run 
failed (%d/%d).",
+   LOG.warn(String.format("Test run failed 
(%d/%d).",
currentRun, 
timesOnFailure + 1), t);
 
// Throw the failure if retried too 
often
@@ -156,7 +156,7 @@ public class RetryRule implements TestRule {
break; // success
}
catch (Throwable t) {
-   LOG.debug(String.format("Test run 
failed (%d/%d).", currentRun, timesOnFailure + 1), t);
+   LOG.warn(String.format("Test run failed 
(%d/%d).", currentRun, timesOnFailure + 1), t);
 
if 
(!exceptionClass.isAssignableFrom(t.getClass()) || currentRun >= 
timesOnFailure) {
// Throw the failure if retried 
too often, or if it is the wrong exception



[44/50] [abbrv] flink git commit: [FLINK-3226] Implement a CodeGenerator for an efficient translation to DataSet programs

2016-02-12 Thread fhueske
http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
deleted file mode 100644
index b706e6d..000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import java.io.StringReader
-
-import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.Indenter._
-import org.apache.flink.api.table.expressions.{Expression, NopExpression}
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for assembling the result of a binary operation.
- */
-class GenerateJoin[L, R, O](
-leftTypeInfo: CompositeType[L],
-rightTypeInfo: CompositeType[R],
-resultTypeInfo: CompositeType[O],
-predicate: Expression,
-outputFields: Seq[Expression],
-cl: ClassLoader,
-config: TableConfig)
-  extends GenerateResultAssembler[FlatJoinFunction[L, R, O]](
-Seq(("in0", leftTypeInfo), ("in1", rightTypeInfo)),
-cl = cl,
-config) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-
-  override protected def generateInternal(): FlatJoinFunction[L, R, O] = {
-
-val leftTpe = typeTermForTypeInfo(leftTypeInfo)
-val rightTpe = typeTermForTypeInfo(rightTypeInfo)
-val resultTpe = typeTermForTypeInfo(resultTypeInfo)
-
-
-val resultCode = createResult(resultTypeInfo, outputFields, o => 
s"coll.collect($o);")
-
-val generatedName = freshName("GeneratedJoin")
-
-
-val code = predicate match {
-  case n: NopExpression =>
-// Janino does not support generics, that's why we need
-// manual casting here
-if (nullCheck) {
-  j"""
-public class $generatedName
-implements 
org.apache.flink.api.common.functions.FlatFlatJoinFunction {
-
-  ${reuseCode(resultTypeInfo)}
-
-  public org.apache.flink.api.table.TableConfig config = null;
-  public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-this.config = config;
-  }
-
-  public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
-$leftTpe in0 = ($leftTpe) _in0;
-$rightTpe in1 = ($rightTpe) _in1;
-
-$resultCode
-  }
-}
-  """
-} else {
-  j"""
-public class $generatedName
-implements org.apache.flink.api.common.functions.FlatJoinFunction {
-
-  ${reuseCode(resultTypeInfo)}
-
-  public org.apache.flink.api.table.TableConfig config = null;
-  public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-this.config = config;
-  }
-
-  public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
-$leftTpe in0 = ($leftTpe) _in0;
-$rightTpe in1 = ($rightTpe) _in1;
-
-$resultCode
-  }
-}
-  """
-}
-
-  case _ =>
-val pred = generateExpression(predicate)
-// Janino does not support generics, that's why we need
-// manual casting here
-if (nullCheck) {
-  j"""
-public class $generatedName
-implements 
org.apache.flink.api.common.functions.FlatFlatJoinFunction {
-
-  ${reuseCode(resultTypeInfo)}
-
-  org.apache.flink.api.table.TableConfig config = null;
-
-  public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-this.config = config;
-${reuseInitCode()}
-  }
-
-  public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
-$l

[28/50] [abbrv] flink git commit: [FLINK-3391] [tests] Fix typo in test config

2016-02-12 Thread fhueske
[FLINK-3391] [tests] Fix typo in test config


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed7d3da7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed7d3da7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed7d3da7

Branch: refs/heads/tableOnCalcite
Commit: ed7d3da7f93769a858f6d4d92c41d2aef69be013
Parents: cf3ae88
Author: Ufuk Celebi 
Authored: Thu Feb 11 16:13:26 2016 +0100
Committer: Ufuk Celebi 
Committed: Thu Feb 11 16:13:26 2016 +0100

--
 .../java/org/apache/flink/test/checkpointing/SavepointITCase.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ed7d3da7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 46c0453..58f8225 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -441,7 +441,7 @@ public class SavepointITCase extends TestLogger {
LOG.info("Created temporary savepoint directory: " + 
savepointDir + ".");
 
config.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
-   
config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "fileystem");
+   
config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
 

config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
checkpointDir.toURI().toString());



[06/50] [abbrv] flink git commit: [FLINK-3355] [rocksdb backend] Allow passing options to the RocksDB backend.

2016-02-12 Thread fhueske
[FLINK-3355] [rocksdb backend] Allow passing options to the RocksDB backend.

This also cleans up the generics in the RocksDB state classes.

This closes #1608


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ee16794
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ee16794
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ee16794

Branch: refs/heads/tableOnCalcite
Commit: 9ee16794909d18aa84e8d0b738a6a447d11e6eeb
Parents: 28c6254
Author: Stephan Ewen 
Authored: Mon Feb 8 19:55:29 2016 +0100
Committer: Stephan Ewen 
Committed: Tue Feb 9 11:03:09 2016 +0100

--
 .../streaming/state/AbstractRocksDBState.java   | 113 +--
 .../contrib/streaming/state/OptionsFactory.java |  31 +
 .../streaming/state/RocksDBListState.java   |  68 ++-
 .../streaming/state/RocksDBReducingState.java   |  86 +++---
 .../streaming/state/RocksDBStateBackend.java|  76 +++--
 .../streaming/state/RocksDBValueState.java  |  74 ++--
 6 files changed, 273 insertions(+), 175 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 783332c..05e15e8 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -1,36 +1,38 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.commons.io.FileUtils;
+
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.KvStateSnapshot;
 import org.apache.flink.util.HDFSCopyFromLocal;
 import org.apache.flink.util.HDFSCopyToLocal;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+
 import org.rocksdb.BackupEngine;
 import org.rocksdb.BackupableDBOptions;
 import org.rocksdb.Env;
@@ -38,7 +40,7 @@ import org.rocksdb.Options;
 import org.rocksdb.RestoreOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.StringAppendOperator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,10 +62,9 @@ import static java.util.Objects.requireNonNull;
  * @param  The type of the namespace.
  * @param  The type of {@link State}.
  * @param  The type of {@link StateDescriptor}.
- * @param  The type of the backend that snapshots this key/value 
state.
  */
-public abstract class AbstractRocksDBState, Backend extends AbstractStateBackend>
-   implements KvState, State {
+public abstract class AbstractRocksDBState>
+   implements KvState, State {
 
pr

[1/2] flink-web git commit: Update website for Flink 0.10.2

2016-02-12 Thread uce
Repository: flink-web
Updated Branches:
  refs/heads/asf-site ffe31bf53 -> 1e07e82a9


http://git-wip-us.apache.org/repos/asf/flink-web/blob/1e07e82a/content/news/2014/08/26/release-0.6.html
--
diff --git a/content/news/2014/08/26/release-0.6.html 
b/content/news/2014/08/26/release-0.6.html
index 602c685..01c634f 100644
--- a/content/news/2014/08/26/release-0.6.html
+++ b/content/news/2014/08/26/release-0.6.html
@@ -83,9 +83,9 @@
   
 
 Latest 
Release (Stable)
-http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.1 
Documentation
-http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; 
class="active">0.10.1 Javadocs
-http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html";
 class="active">0.10.1 ScalaDocs
+http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.2 
Documentation
+http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; 
class="active">0.10.2 Javadocs
+http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html";
 class="active">0.10.2 ScalaDocs
 
 
 

http://git-wip-us.apache.org/repos/asf/flink-web/blob/1e07e82a/content/news/2014/09/26/release-0.6.1.html
--
diff --git a/content/news/2014/09/26/release-0.6.1.html 
b/content/news/2014/09/26/release-0.6.1.html
index 9f6c7e7..c6f508c 100644
--- a/content/news/2014/09/26/release-0.6.1.html
+++ b/content/news/2014/09/26/release-0.6.1.html
@@ -83,9 +83,9 @@
   
 
 Latest 
Release (Stable)
-http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.1 
Documentation
-http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; 
class="active">0.10.1 Javadocs
-http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html";
 class="active">0.10.1 ScalaDocs
+http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.2 
Documentation
+http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; 
class="active">0.10.2 Javadocs
+http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html";
 class="active">0.10.2 ScalaDocs
 
 
 

http://git-wip-us.apache.org/repos/asf/flink-web/blob/1e07e82a/content/news/2014/10/03/upcoming_events.html
--
diff --git a/content/news/2014/10/03/upcoming_events.html 
b/content/news/2014/10/03/upcoming_events.html
index 01744b2..b1207b3 100644
--- a/content/news/2014/10/03/upcoming_events.html
+++ b/content/news/2014/10/03/upcoming_events.html
@@ -83,9 +83,9 @@
   
 
 Latest 
Release (Stable)
-http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.1 
Documentation
-http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; 
class="active">0.10.1 Javadocs
-http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html";
 class="active">0.10.1 ScalaDocs
+http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.2 
Documentation
+http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; 
class="active">0.10.2 Javadocs
+http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html";
 class="active">0.10.2 ScalaDocs
 
 
 

http://git-wip-us.apache.org/repos/asf/flink-web/blob/1e07e82a/content/news/2014/11/04/release-0.7.0.html
--
diff --git a/content/news/2014/11/04/release-0.7.0.html 
b/content/news/2014/11/04/release-0.7.0.html
index 9d97cb6..c7a3bf9 100644
--- a/content/news/2014/11/04/release-0.7.0.html
+++ b/content/news/2014/11/04/release-0.7.0.html
@@ -83,9 +83,9 @@
   
 
 Latest 
Release (Stable)
-http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.1 
Documentation
-http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; 
class="active">0.10.1 Javadocs
-http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index.html";
 class="active">0.10.1 ScalaDocs
+http://ci.apache.org/projects/flink/flink-docs-release-0.10";>0.10.2 
Documentation
+http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java"; 
class="active">0.10.2 Javadocs
+http://ci.apache.org/projects/flink/flink-docs-release-0.10/api/scala/index

[2/2] flink-web git commit: Update website for Flink 0.10.2

2016-02-12 Thread uce
Update website for Flink 0.10.2


Project: http://git-wip-us.apache.org/repos/asf/flink-web/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink-web/commit/1e07e82a
Tree: http://git-wip-us.apache.org/repos/asf/flink-web/tree/1e07e82a
Diff: http://git-wip-us.apache.org/repos/asf/flink-web/diff/1e07e82a

Branch: refs/heads/asf-site
Commit: 1e07e82a9d050262ac1fbb1c80e60dd134a462e7
Parents: ffe31bf
Author: Ufuk Celebi 
Authored: Thu Feb 11 19:42:13 2016 +0100
Committer: Ufuk Celebi 
Committed: Thu Feb 11 19:42:13 2016 +0100

--
 _config.yml |  30 +--
 _posts/2016-02-11-release-0.10.2.md |  34 +++
 content/blog/feed.xml   |  37 +++
 content/blog/index.html |  49 ++--
 content/blog/page2/index.html   |  49 ++--
 content/blog/page3/index.html   |  49 ++--
 content/blog/page4/index.html   |  57 +++--
 content/blog/page5/index.html   |  44 +++-
 content/community.html  |   6 +-
 content/contribute-code.html|   6 +-
 content/contribute-documentation.html   |   6 +-
 content/downloads.html  |  42 ++--
 content/faq.html|   6 +-
 content/features.html   |   6 +-
 content/how-to-contribute.html  |   6 +-
 content/improve-website.html|   6 +-
 content/index.html  |  28 +--
 content/material.html   |   6 +-
 content/news/2012/08/21/release02.html  |   6 +-
 content/news/2012/10/15/icde2013.html   |   6 +-
 content/news/2012/11/12/btw2013demo.html|   6 +-
 content/news/2012/11/21/previewICDE2013.html|   6 +-
 content/news/2013/03/27/www-demo-paper.html |   6 +-
 content/news/2013/10/21/cikm2013-paper.html |   6 +-
 .../2013/12/13/humboldt-innovation-award.html   |   6 +-
 .../2014/01/10/stratosphere-hadoop-summit.html  |   6 +-
 .../news/2014/01/12/0.4-migration-guide.html|   6 +-
 .../2014/01/13/stratosphere-release-0.4.html|   6 +-
 .../26/optimizer_plan_visualization_tool.html   |   6 +-
 content/news/2014/01/28/querying_mongodb.html   |   6 +-
 .../18/amazon-elastic-mapreduce-cloud-yarn.html |   6 +-
 ...stratosphere-google-summer-of-code-2014.html |   6 +-
 .../16/stratosphere-goes-apache-incubator.html  |   6 +-
 content/news/2014/05/31/release-0.5.html|   6 +-
 content/news/2014/08/26/release-0.6.html|   6 +-
 content/news/2014/09/26/release-0.6.1.html  |   6 +-
 content/news/2014/10/03/upcoming_events.html|   6 +-
 content/news/2014/11/04/release-0.7.0.html  |   6 +-
 .../news/2014/11/18/hadoop-compatibility.html   |   6 +-
 content/news/2015/01/06/december-in-flink.html  |   6 +-
 content/news/2015/01/21/release-0.8.html|   6 +-
 content/news/2015/02/04/january-in-flink.html   |   6 +-
 content/news/2015/02/09/streaming-example.html  |   6 +-
 .../news/2015/03/02/february-2015-in-flink.html |   6 +-
 .../peeking-into-Apache-Flinks-Engine-Room.html |   6 +-
 content/news/2015/04/07/march-in-flink.html |   6 +-
 .../2015/04/13/release-0.9.0-milestone1.html|   6 +-
 .../05/11/Juggling-with-Bits-and-Bytes.html |   6 +-
 .../news/2015/05/14/Community-update-April.html |   6 +-
 .../announcing-apache-flink-0.9.0-release.html  |   6 +-
 .../2015/08/24/introducing-flink-gelly.html |   6 +-
 content/news/2015/09/01/release-0.9.1.html  |   6 +-
 content/news/2015/09/03/flink-forward.html  |   6 +-
 content/news/2015/09/16/off-heap-memory.html|   6 +-
 content/news/2015/11/16/release-0.10.0.html |   6 +-
 content/news/2015/11/27/release-0.10.1.html |   6 +-
 .../news/2015/12/04/Introducing-windows.html|   6 +-
 .../news/2015/12/11/storm-compatibility.html|   6 +-
 content/news/2015/12/18/a-year-in-review.html   |   6 +-
 content/news/2016/02/11/release-0.10.2.html | 236 +++
 content/privacy-policy.html |   6 +-
 content/project.html|   6 +-
 content/q/quickstart-scala.sh   |   2 +-
 content/q/quickstart.sh |   2 +-
 q/quickstart-scala.sh   |   2 +-
 q/quickstart.sh |   2 +-
 66 files changed, 682 insertions(+), 287 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink-web/blob/1e07e82a/_config.yml
--
diff --git a/_config.yml b/_config.yml
index 88c2571..ef628a0 100644
--- a/_config.yml
+++ b/_config.yml
@@ -9,10 +9,10 @@ url: http://flink.apache.org
 
 DOCS_BASE_URL: http://ci.apache.org/projects/flink/
 
-FLINK_VERSION_STABLE: 0.10.1
-FLINK_VERSION_HADOOP

flink git commit: [FLINK-3270] Add Kafka example

2016-02-12 Thread rmetzger
Repository: flink
Updated Branches:
  refs/heads/master 1dee62b4b -> b6bfcf008


[FLINK-3270] Add Kafka example

This closes #1533


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6bfcf00
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6bfcf00
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6bfcf00

Branch: refs/heads/master
Commit: b6bfcf008e20eb4d4a3e81bedf7eaf871f121d4c
Parents: 1dee62b
Author: Robert Metzger 
Authored: Thu Jan 21 11:42:12 2016 +0100
Committer: Robert Metzger 
Committed: Fri Feb 12 15:02:28 2016 +0100

--
 flink-examples/flink-examples-streaming/pom.xml | 51 ++
 .../streaming/examples/kafka/ReadFromKafka.java | 63 +
 .../examples/kafka/WriteIntoKafka.java  | 72 
 .../kafka/examples/ReadFromKafka.java   | 56 ---
 .../kafka/examples/WriteIntoKafka.java  | 70 ---
 5 files changed, 186 insertions(+), 126 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b6bfcf00/flink-examples/flink-examples-streaming/pom.xml
--
diff --git a/flink-examples/flink-examples-streaming/pom.xml 
b/flink-examples/flink-examples-streaming/pom.xml
index ba49dc5..3ea3276 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -61,6 +61,12 @@ under the License.
 

org.apache.flink
+   flink-connector-kafka-0.8_2.10
+   ${project.version}
+   
+
+   
+   org.apache.flink
flink-streaming-java_2.10
${project.version}
test
@@ -522,6 +528,51 @@ under the License.



+
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   fat-jar-kafka-example
+   package
+   
+   shade
+   
+   
+   
false
+   
false
+   
false
+   
+   
+   
org.apache.flink.streaming.examples.kafka.ReadFromKafka
+   
+   
+   
Kafka
+   
+   
+   
+   
*
+   

+   
org/apache/flink/streaming/examples/kafka/**
+   
org/apache/flink/streaming/**
+   
org/apache/kafka/**
+   
org/apache/curator/**
+   
org/apache/zookeeper/**
+   
org/apache/jute/**
+   
org/I0Itec/**
+   
jline/**
+   
com/yammer/**
+   
kafka/**
+   

+   
+   
+   
+   
+   
+  

[4/4] flink git commit: [hotfix] Minor code cleanups in AbstractStateBackend

2016-02-12 Thread sewen
[hotfix] Minor code cleanups in AbstractStateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d93b154d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d93b154d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d93b154d

Branch: refs/heads/master
Commit: d93b154d41045e89dcc2238885db83fd947cd104
Parents: b6bfcf0
Author: Stephan Ewen 
Authored: Thu Feb 11 22:13:37 2016 +0100
Committer: Stephan Ewen 
Committed: Fri Feb 12 18:51:01 2016 +0100

--
 .../apache/flink/runtime/state/AbstractStateBackend.java  | 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d93b154d/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index e989af3..d0c4f82 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -123,7 +123,7 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
 * @param  The type of the namespace.
 * @param  The type of the value that the {@code ValueState} can 
store.
 */
-   abstract protected  ValueState 
createValueState(TypeSerializer namespaceSerializer, ValueStateDescriptor 
stateDesc) throws Exception;
+   protected abstract  ValueState 
createValueState(TypeSerializer namespaceSerializer, ValueStateDescriptor 
stateDesc) throws Exception;
 
/**
 * Creates and returns a new {@link ListState}.
@@ -134,7 +134,7 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
 * @param  The type of the namespace.
 * @param  The type of the values that the {@code ListState} can 
store.
 */
-   abstract protected  ListState 
createListState(TypeSerializer namespaceSerializer, ListStateDescriptor 
stateDesc) throws Exception;
+   protected abstract  ListState 
createListState(TypeSerializer namespaceSerializer, ListStateDescriptor 
stateDesc) throws Exception;
 
/**
 * Creates and returns a new {@link ReducingState}.
@@ -145,7 +145,7 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
 * @param  The type of the namespace.
 * @param  The type of the values that the {@code ListState} can 
store.
 */
-   abstract protected  ReducingState 
createReducingState(TypeSerializer namespaceSerializer, 
ReducingStateDescriptor stateDesc) throws Exception;
+   protected abstract  ReducingState 
createReducingState(TypeSerializer namespaceSerializer, 
ReducingStateDescriptor stateDesc) throws Exception;
 
/**
 * Sets the current key that is used for partitioned state.
@@ -170,7 +170,7 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
 *
 * @param stateDescriptor The state identifier for the state. This 
contains name
 *   and can create a default state value.
-* @param  The type of the key.
+
 * @param  The type of the namespace.
 * @param  The type of the state.
 *
@@ -179,7 +179,7 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
 * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
 */
@SuppressWarnings({"rawtypes", "unchecked"})
-   public  S getPartitionedState(final N namespace, 
final TypeSerializer namespaceSerializer, final StateDescriptor 
stateDescriptor) throws Exception {
+   public  S getPartitionedState(final N namespace, 
final TypeSerializer namespaceSerializer, final StateDescriptor 
stateDescriptor) throws Exception {
 
if (keySerializer == null) {
throw new Exception("State key serializer has not been 
configured in the config. " +



[2/4] flink git commit: [FLINK-2991] Add Folding State and use in WindowOperator

2016-02-12 Thread sewen
http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 88e619a..0d01733 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -36,10 +37,9 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.functions.windowing.FoldWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
+import 
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
-import 
org.apache.flink.streaming.api.functions.windowing.ReduceIterableWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
@@ -71,7 +71,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
  * If an {@link Evictor} is specified it will be used to evict elements from 
the window after
  * evaluation was triggered by the {@code Trigger} but before the actual 
evaluation of the window.
  * When using an evictor window performance will degrade significantly, since
- * pre-aggregation of window results cannot be used.
+ * incremental aggregation of window results cannot be used.
  *
  * 
  * Note that the {@code WindowedStream} is purely and API construct, during 
runtime
@@ -120,7 +120,7 @@ public class WindowedStream {
 *
 * 
 * Note: When using an evictor window performance will degrade 
significantly, since
-* pre-aggregation of window results cannot be used.
+* incremental aggregation of window results cannot be used.
 */
@PublicEvolving
public WindowedStream evictor(Evictor 
evictor) {
@@ -137,13 +137,14 @@ public class WindowedStream {
 * Applies a reduce function to the window. The window function is 
called for each evaluation
 * of the window for each key individually. The output of the reduce 
function is interpreted
 * as a regular non-windowed stream.
+*
 * 
-* This window will try and pre-aggregate data as much as the window 
policies permit. For example,
-* tumbling time windows can perfectly pre-aggregate the data, meaning 
that only one element per
-* key is stored. Sliding time windows will pre-aggregate on the 
granularity of the slide interval,
+* This window will try and incrementally aggregate data as much as the 
window policies permit.
+* For example, tumbling time windows can aggregate the data, meaning 
that only one element per
+* key is stored. Sliding time windows will aggregate on the 
granularity of the slide interval,
 * so a few elements are stored per key (one per slide interval).
-* Custom windows may not be able to pre-aggregate, or may need to 
store extra values in an
-* aggregation tree.
+* Custom windows may not be able to incrementally aggregate, or may 
need to store extra values
+* in an aggregation tree.
 * 
 * @param function The reduce function.
 * @return The data stream that is the result of applying the reduce 
function to the window. 
@@ -159,48 +160,14 @@ public class WindowedStream {
function = input.getExecutionEnvironment().clean(function);
 
String callLocation = Utils.getCallLocationName();
-   String udfName = "Reduce at " + callLocation;
+   String udfNa

[1/4] flink git commit: [FLINK-2991] Adjust RocksDB Folding State to latest RocksDBStateBackend

2016-02-12 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master b6bfcf008 -> e927ec0be


[FLINK-2991] Adjust RocksDB Folding State to latest RocksDBStateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e927ec0b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e927ec0b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e927ec0b

Branch: refs/heads/master
Commit: e927ec0bee4cf951e49b838538efa679e1af13e2
Parents: 94cba89
Author: Stephan Ewen 
Authored: Fri Feb 12 15:35:08 2016 +0100
Committer: Stephan Ewen 
Committed: Fri Feb 12 18:51:01 2016 +0100

--
 .../streaming/state/AbstractRocksDBState.java   |  6 +-
 .../streaming/state/RocksDBFoldingState.java| 89 +++-
 .../streaming/state/RocksDBStateBackend.java|  3 +-
 .../runtime/state/AbstractStateBackend.java |  2 +-
 4 files changed, 57 insertions(+), 43 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e927ec0b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 05e15e8..7d3172a 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -95,7 +95,8 @@ public abstract class AbstractRocksDBState keySerializer,
+   protected AbstractRocksDBState(
+   TypeSerializer keySerializer,
TypeSerializer namespaceSerializer,
File dbPath,
String checkpointPath,
@@ -139,7 +140,8 @@ public abstract class AbstractRocksDBState keySerializer,
+   protected AbstractRocksDBState(
+   TypeSerializer keySerializer,
TypeSerializer namespaceSerializer,
File dbPath,
String checkpointPath,

http://git-wip-us.apache.org/repos/asf/flink/blob/e927ec0b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 7e4e573..d7b75bd 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -21,13 +21,12 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import org.rocksdb.Options;
 import org.rocksdb.RocksDBException;
 
 import java.io.ByteArrayInputStream;
@@ -39,16 +38,15 @@ import java.net.URI;
 import static java.util.Objects.requireNonNull;
 
 /**
- * {@link ReducingState} implementation that stores state in RocksDB.
+ * {@link FoldingState} implementation that stores state in RocksDB.
  *
  * @param  The type of the key.
  * @param  The type of the namespace.
  * @param  The type of the values that can be folded into the state.
  * @param  The type of the value in the folding state.
- * @param  The type of the backend that snapshots this key/value 
state.
  */
-public class RocksDBFoldingState
-   extends AbstractRocksDBState, 
FoldingStateDescriptor, Backend>
+public class RocksDBFoldingState
+   extends AbstractRocksDBState, 
FoldingStateDescriptor>
implements FoldingState {
 
/** Serializer for the values */
@@ -70,12 +68,16 @@ public class RocksDBFoldingState keySerializer,
-   TypeSerializer namespaceSeriali

[3/4] flink git commit: [FLINK-2991] Add Folding State and use in WindowOperator

2016-02-12 Thread sewen
[FLINK-2991] Add Folding State and use in WindowOperator

This enables efficient incremental aggregation of fold window.

This also adds:
- WindowedStream.apply(initVal, foldFunction, windowFunction)
- AllWindowedStream.apply(initVal, foldFunction, windowFunction)

This closes #1605


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94cba899
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94cba899
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94cba899

Branch: refs/heads/master
Commit: 94cba8998c726092e2cc80fd022ca40bf0c38ec2
Parents: d93b154
Author: Aljoscha Krettek 
Authored: Mon Feb 8 14:56:19 2016 +0100
Committer: Stephan Ewen 
Committed: Fri Feb 12 18:51:01 2016 +0100

--
 .../streaming/state/RocksDBFoldingState.java| 187 ++
 .../streaming/state/RocksDBListState.java   |   9 +-
 .../streaming/state/RocksDBReducingState.java   |  14 +-
 .../streaming/state/RocksDBStateBackend.java|  20 +-
 .../streaming/state/RocksDBValueState.java  |  14 +-
 .../contrib/streaming/state/DbStateBackend.java |  18 ++
 .../flink/api/common/state/FoldingState.java|  37 
 .../common/state/FoldingStateDescriptor.java| 108 ++
 .../flink/api/common/state/StateBackend.java|   8 +
 .../flink/api/common/state/StateDescriptor.java |   2 +-
 .../runtime/state/AbstractStateBackend.java |  20 ++
 .../runtime/state/GenericFoldingState.java  | 133 +
 .../flink/runtime/state/GenericListState.java   |   6 +
 .../runtime/state/GenericReducingState.java |   7 +
 .../state/filesystem/FsFoldingState.java| 145 ++
 .../state/filesystem/FsStateBackend.java|   7 +
 .../runtime/state/memory/MemFoldingState.java   | 118 +++
 .../state/memory/MemoryStateBackend.java|   7 +
 .../runtime/state/StateBackendTestBase.java | 105 ++
 .../api/datastream/AllWindowedStream.java   | 103 --
 .../api/datastream/WindowedStream.java  | 198 +--
 .../windowing/FoldAllWindowFunction.java|  92 -
 .../windowing/FoldApplyAllWindowFunction.java   |  95 +
 .../windowing/FoldApplyWindowFunction.java  |  95 +
 .../functions/windowing/FoldWindowFunction.java |  91 -
 .../windowing/PassThroughAllWindowFunction.java |  30 +++
 .../windowing/PassThroughWindowFunction.java|  30 +++
 .../windowing/ReduceAllWindowFunction.java  |  30 ---
 .../windowing/ReduceWindowFunction.java |  30 ---
 .../ReduceWindowFunctionWithWindow.java |  31 ---
 .../operators/FoldApplyWindowFunctionTest.java  | 143 ++
 .../api/operators/FoldWindowFunctionTest.java   | 132 -
 .../operators/windowing/WindowOperatorTest.java |  10 +-
 .../runtime/state/StateBackendITCase.java   |   8 +
 .../streaming/api/scala/AllWindowedStream.scala |  60 ++
 .../streaming/api/scala/WindowedStream.scala|  60 ++
 36 files changed, 1707 insertions(+), 496 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/94cba899/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
new file mode 100644
index 000..7e4e573
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apac

flink git commit: [FLINK-3392] [kafka] Fix unsynchronized access in ClosableBlockingQueue

2016-02-12 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master e927ec0be -> c47cb7af1


[FLINK-3392] [kafka] Fix unsynchronized access in ClosableBlockingQueue


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c47cb7af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c47cb7af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c47cb7af

Branch: refs/heads/master
Commit: c47cb7af1474b08ee1b7b7a813d03da022015ed1
Parents: e927ec0
Author: Stephan Ewen 
Authored: Fri Feb 12 19:30:22 2016 +0100
Committer: Stephan Ewen 
Committed: Fri Feb 12 19:30:22 2016 +0100

--
 .../connectors/kafka/internals/ClosableBlockingQueue.java | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c47cb7af/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
--
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
index 856c2ad..23ff276 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue.java
@@ -104,7 +104,12 @@ public class ClosableBlockingQueue {
 * @return The number of elements currently in the queue.
 */
public int size() {
-   return elements.size();
+   lock.lock();
+   try {
+   return elements.size();
+   } finally {
+   lock.unlock();
+   }
}
 
/**



[2/3] flink git commit: [FLINK-3352] Use HDFS Config in RocksDB Copy Utilities

2016-02-12 Thread aljoscha
[FLINK-3352] Use HDFS Config in RocksDB Copy Utilities

This also moves the utilities (HDFSCopyFromLocal and HDFSCopyToLocal) to
the RocksDB package because we would need a HDFS dependency in
flink-core otherwise.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5d71909
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5d71909
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5d71909

Branch: refs/heads/master
Commit: f5d719096083504a0b5827e35c1d28e1180e5e1d
Parents: 31310cd
Author: Aljoscha Krettek 
Authored: Thu Feb 11 19:07:18 2016 +0100
Committer: Aljoscha Krettek 
Committed: Fri Feb 12 22:59:14 2016 +0100

--
 .../streaming/state/AbstractRocksDBState.java   |  63 ++---
 .../streaming/state/HDFSCopyFromLocal.java  |  57 
 .../streaming/state/HDFSCopyToLocal.java|  58 
 .../streaming/state/RocksDBListState.java   |   4 +-
 .../streaming/state/RocksDBReducingState.java   |   6 +-
 .../streaming/state/RocksDBValueState.java  |   4 +-
 .../streaming/state/HDFSCopyUtilitiesTest.java  | 140 +++
 .../apache/flink/util/HDFSCopyFromLocal.java|  48 ---
 .../org/apache/flink/util/HDFSCopyToLocal.java  |  49 ---
 9 files changed, 309 insertions(+), 120 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f5d71909/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 6e4adf5..76f05d6 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -24,12 +24,11 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.KvStateSnapshot;
 
-import org.apache.flink.util.HDFSCopyFromLocal;
-import org.apache.flink.util.HDFSCopyToLocal;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -46,7 +45,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.util.UUID;
@@ -73,6 +74,8 @@ import static java.util.Objects.requireNonNull;
 public abstract class AbstractRocksDBState>
implements KvState, State {
 
+   private static final String HADOOP_CONF_NAME = "hadoop-conf.binary";
+
private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRocksDBState.class);
 
/** Serializer for the keys */
@@ -96,10 +99,15 @@ public abstract class AbstractRocksDBState namespaceSerializer,
SD stateDesc) {

-   this.dbPath = dbPath;
+   this.basePath = basePath;
this.checkpointPath = checkpointPath;
this.backupUri = backupUri;
this.checkpointId = checkpointId;
@@ -339,7 +368,7 @@ public abstract class AbstractRocksDBState keySerializer,
TypeSerializer namespaceSerializer,
SD stateDesc,
-   File dbPath,
+   File basePath,
String backupPath,
String restorePath,
Options options) throws Exception;
@@ -360,13 +389,13 @@ public abstract class AbstractRocksDBState 
materialize() throws Exception {
try {
-   
HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
+   
HDFSCopyFromLocal.copyFromLocal(state.hadoopConfPath, localBackupPath, 
backupUri);
return state.createRocksDBSnapshot(backupUri, 
checkpointId);
} catch (Exception e) {
-  

[3/3] flink git commit: [FLINK-3359] Make RocksDB File Copies Asynchronous

2016-02-12 Thread aljoscha
[FLINK-3359] Make RocksDB File Copies Asynchronous


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31310cd6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31310cd6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31310cd6

Branch: refs/heads/master
Commit: 31310cd657381d4035cda9659d737ed440f90d26
Parents: c47cb7a
Author: Aljoscha Krettek 
Authored: Thu Feb 11 17:29:37 2016 +0100
Committer: Aljoscha Krettek 
Committed: Fri Feb 12 22:59:14 2016 +0100

--
 .../flink-statebackend-rocksdb/pom.xml  |  15 +-
 .../streaming/state/AbstractRocksDBState.java   | 161 +-
 .../streaming/state/RocksDBFoldingState.java|   2 +-
 .../streaming/state/RocksDBListState.java   |   2 +-
 .../streaming/state/RocksDBReducingState.java   |   2 +-
 .../streaming/state/RocksDBStateBackend.java|   6 +-
 .../streaming/state/RocksDBValueState.java  |   2 +-
 .../state/RocksDBAsyncKVSnapshotTest.java   | 208 +++
 .../state/RocksDBStateBackendTest.java  |   4 +-
 .../state/AsynchronousKvStateSnapshot.java  |  62 ++
 .../runtime/state/StateBackendTestBase.java | 180 ++--
 .../streaming/runtime/tasks/StreamTask.java |  26 ++-
 .../runtime/tasks/StreamTaskStateList.java  |  58 +++---
 .../tasks/OneInputStreamTaskTestHarness.java|   7 +
 .../tasks/StreamTaskAsyncCheckpointTest.java|   3 +-
 .../EventTimeWindowCheckpointingITCase.java |   6 +
 16 files changed, 584 insertions(+), 160 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/31310cd6/flink-contrib/flink-statebackend-rocksdb/pom.xml
--
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml 
b/flink-contrib/flink-statebackend-rocksdb/pom.xml
index 999c496..9c1601e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -57,7 +57,6 @@ under the License.
rocksdbjni
4.1.0

-

org.apache.flink
flink-runtime_2.10
@@ -65,7 +64,17 @@ under the License.
test-jar
test

-
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   test-jar
+   test
+   
+   
+   junit
+   junit
+   4.11
+   

-
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31310cd6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 7d3172a..6e4adf5 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -24,11 +24,12 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.KvStateSnapshot;
+
 import org.apache.flink.util.HDFSCopyFromLocal;
 import org.apache.flink.util.HDFSCopyToLocal;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,6 +49,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.UUID;
 
 import static java.util.Objects.requireNonNull;
 
@@ -58,6 +60,11 @@ import static java.util.Objects.requireNonNull;
  * checkpointing/restoring the database and for disposal in the {@link 
#dispose()} method. The
  * concrete subclasses just use the RocksDB handle to store/retrieve state.
  *
+ * State is checkpointed asynchronously. The synchronous part is drawing 
the actual backup
+ * from RocksDB, this is done in {@link #snapshot(long, long)}. This will 
return a
+ * {@link AsyncRocksDBSnapshot} that will perform the

[1/3] flink git commit: [hotfix] Fix field names in RocksDBStateBackend

2016-02-12 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master c47cb7af1 -> 8b7caaa22


[hotfix] Fix field names in RocksDBStateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b7caaa2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b7caaa2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b7caaa2

Branch: refs/heads/master
Commit: 8b7caaa22fee9a131292c1a3fdb45872d6836dbf
Parents: f5d7190
Author: Aljoscha Krettek 
Authored: Fri Feb 12 18:13:39 2016 +0100
Committer: Aljoscha Krettek 
Committed: Fri Feb 12 22:59:14 2016 +0100

--
 .../streaming/state/RocksDBStateBackend.java| 32 ++--
 1 file changed, 16 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8b7caaa2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 72a4c58..e3b4f4d 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -83,10 +83,10 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
// DB storage directories

/** Base paths for RocksDB directory, as configured. May be null. */
-   private Path[] dbBasePaths;
+   private Path[] configuredDbBasePaths;
 
/** Base paths for RocksDB directory, as initialized */
-   private File[] dbStorageDirectories;
+   private File[] initializedDbBasePaths;

private int nextDirectory;

@@ -171,15 +171,15 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
this.jobId = env.getJobID();

// initialize the paths where the local RocksDB files should be 
stored
-   if (dbBasePaths == null) {
+   if (configuredDbBasePaths == null) {
// initialize from the temp directories
-   dbStorageDirectories = 
env.getIOManager().getSpillingDirectories();
+   initializedDbBasePaths = 
env.getIOManager().getSpillingDirectories();
}
else {
-   List dirs = new ArrayList<>(dbBasePaths.length);
+   List dirs = new 
ArrayList<>(configuredDbBasePaths.length);
String errorMessage = "";

-   for (Path path : dbBasePaths) {
+   for (Path path : configuredDbBasePaths) {
File f = new File(path.toUri().getPath());
if (!f.exists() && !f.mkdirs()) {
String msg = "Local DB files directory 
'" + f.getAbsolutePath()
@@ -193,11 +193,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
if (dirs.isEmpty()) {
throw new Exception("No local storage 
directories available. " + errorMessage);
} else {
-   dbStorageDirectories = dirs.toArray(new 
File[dirs.size()]);
+   initializedDbBasePaths = dirs.toArray(new 
File[dirs.size()]);
}
}

-   nextDirectory = new 
Random().nextInt(dbStorageDirectories.length);
+   nextDirectory = new 
Random().nextInt(initializedDbBasePaths.length);
}
 
@Override
@@ -225,15 +225,15 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
}

File[] getStoragePaths() {
-   return dbStorageDirectories;
+   return initializedDbBasePaths;
}

File getNextStoragePath() {
int ni = nextDirectory + 1;
-   ni = ni >= dbStorageDirectories.length ? 0 : ni;
+   ni = ni >= initializedDbBasePaths.length ? 0 : ni;
nextDirectory = ni;

-   return dbStorageDirectories[ni];
+   return initializedDbBasePaths[ni];
}
 
// 

@@ -330,7 +330,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
 */
public void setDbStoragePaths(String... 

flink git commit: [FLINK-3393] [core] ExternalProcessRunner wait to finish copying error stream

2016-02-12 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master 8b7caaa22 -> 8e55bbd41


[FLINK-3393] [core] ExternalProcessRunner wait to finish copying error stream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e55bbd4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e55bbd4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e55bbd4

Branch: refs/heads/master
Commit: 8e55bbd41725cd8c7f29a9e41993a22777ebdacf
Parents: 8b7caaa
Author: Greg Hogan 
Authored: Fri Feb 12 08:38:05 2016 -0500
Committer: Aljoscha Krettek 
Committed: Fri Feb 12 23:04:06 2016 +0100

--
 .../java/org/apache/flink/util/ExternalProcessRunner.java | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8e55bbd4/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java 
b/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java
index 8e4725c..b7e388c 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java
@@ -41,6 +41,8 @@ public class ExternalProcessRunner {
 
private final Process process;
 
+   private final Thread pipeForwarder;
+
final StringWriter errorOutput = new StringWriter();
 
/**
@@ -63,7 +65,7 @@ public class ExternalProcessRunner {
 
process = new ProcessBuilder(commandList).start();
 
-   new PipeForwarder(process.getErrorStream(), errorOutput);
+   pipeForwarder = new PipeForwarder(process.getErrorStream(), 
errorOutput);
}
 
/**
@@ -83,6 +85,9 @@ public class ExternalProcessRunner {
try {
int returnCode = process.waitFor();
 
+   // wait to finish copying standard error stream
+   pipeForwarder.join();
+
if (returnCode != 0) {
// determine whether we failed because of a 
ClassNotFoundException and forward that
if 
(getErrorOutput().toString().contains("Error: Could not find or load main class 
" + entryPointClassName)) {



flink git commit: [FLINK-3107] [runtime] Defer start of checkpoint ID counter

2016-02-12 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 8e55bbd41 -> 937963e33


[FLINK-3107] [runtime] Defer start of checkpoint ID counter


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/937963e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/937963e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/937963e3

Branch: refs/heads/master
Commit: 937963e339ddd57cc65e3da8af5398600e4d9ad2
Parents: 8e55bbd
Author: Ufuk Celebi 
Authored: Fri Feb 12 22:30:13 2016 +0100
Committer: Ufuk Celebi 
Committed: Fri Feb 12 23:17:04 2016 +0100

--
 .../flink/runtime/checkpoint/SavepointCoordinator.java |  1 +
 .../runtime/checkpoint/SavepointCoordinatorTest.java   | 13 -
 2 files changed, 13 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/937963e3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
index 6ce6502..ea4b8ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
@@ -230,6 +230,7 @@ public class SavepointCoordinator extends 
CheckpointCoordinator {
 
// Reset the checkpoint ID counter
long nextCheckpointId = checkpoint.getCheckpointID();
+   checkpointIdCounter.start();
checkpointIdCounter.setCount(nextCheckpointId + 1);
LOG.info("Reset the checkpoint ID to {}", 
nextCheckpointId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/937963e3/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
index 4f9ae60..6bbdf62 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
@@ -175,6 +175,7 @@ public class SavepointCoordinatorTest {
}
}
 
+   MockCheckpointIdCounter idCounter = new 
MockCheckpointIdCounter();
StateStore savepointStore = new HeapStateStore<>();
 
SavepointCoordinator coordinator = createSavepointCoordinator(
@@ -184,7 +185,7 @@ public class SavepointCoordinatorTest {
triggerVertices,
ackVertices,
new ExecutionVertex[] {},
-   new MockCheckpointIdCounter(),
+   idCounter,
savepointStore);
 
Future savepointPathFuture = 
coordinator.triggerSavepoint(1231273123);
@@ -213,6 +214,9 @@ public class SavepointCoordinatorTest {
// Verify all promises removed
assertEquals(0, getSavepointPromises(coordinator).size());
 
+   // Verify checkpoint ID counter started
+   assertTrue(idCounter.isStarted());
+
coordinator.shutdown();
}
 
@@ -1083,15 +1087,18 @@ public class SavepointCoordinatorTest {
 
private static class MockCheckpointIdCounter implements 
CheckpointIDCounter {
 
+   private boolean started;
private long count;
private long lastReturnedCount;
 
@Override
public void start() throws Exception {
+   started = true;
}
 
@Override
public void stop() throws Exception {
+   started = false;
}
 
@Override
@@ -1108,5 +1115,9 @@ public class SavepointCoordinatorTest {
long getLastReturnedCount() {
return lastReturnedCount;
}
+
+   public boolean isStarted() {
+   return started;
+   }
}
 }