[flink] branch master updated: [hotfix][kafka] Fix typo in filter property in pom.xml

2019-07-18 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 01fd52b  [hotfix][kafka] Fix typo in filter property in pom.xml
01fd52b is described below

commit 01fd52b7f2277f9553d56d08b03ab734b9700c1e
Author: Zhenghua Gao 
AuthorDate: Thu Jul 18 15:51:14 2019 +0800

[hotfix][kafka] Fix typo in filter property in pom.xml

This closes #9142
---
 flink-connectors/flink-sql-connector-kafka/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-connectors/flink-sql-connector-kafka/pom.xml 
b/flink-connectors/flink-sql-connector-kafka/pom.xml
index 6e7ec86..3051ad4 100644
--- a/flink-connectors/flink-sql-connector-kafka/pom.xml
+++ b/flink-connectors/flink-sql-connector-kafka/pom.xml
@@ -74,7 +74,7 @@ under the License.

Cites a binary dependency on jersey, but this is neither reflected in 
the

dependency graph, nor are any jersey files bundled. -->

NOTICE
-   
common/**
+   
common/**







[flink] branch release-1.9 updated: [hotfix][kafka] Fix typo in filter property in pom.xml

2019-07-18 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 4b08bc4  [hotfix][kafka] Fix typo in filter property in pom.xml
4b08bc4 is described below

commit 4b08bc43536037e7c973661b4a4cae4e314ef514
Author: Zhenghua Gao 
AuthorDate: Thu Jul 18 15:51:14 2019 +0800

[hotfix][kafka] Fix typo in filter property in pom.xml

This closes #9142
---
 flink-connectors/flink-sql-connector-kafka/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-connectors/flink-sql-connector-kafka/pom.xml 
b/flink-connectors/flink-sql-connector-kafka/pom.xml
index cc1086d..483dff8 100644
--- a/flink-connectors/flink-sql-connector-kafka/pom.xml
+++ b/flink-connectors/flink-sql-connector-kafka/pom.xml
@@ -74,7 +74,7 @@ under the License.

Cites a binary dependency on jersey, but this is neither reflected in 
the

dependency graph, nor are any jersey files bundled. -->

NOTICE
-   
common/**
+   
common/**







[flink] branch master updated: [FLINK-13256] Ensure periodical checkpointing continues when regional failover aborts pending checkpoints

2019-07-18 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 1ec3424  [FLINK-13256] Ensure periodical checkpointing continues when 
regional failover aborts pending checkpoints
1ec3424 is described below

commit 1ec34249a0303ae64d049d177057ef9b6c413ab5
Author: Yun Tang 
AuthorDate: Thu Jul 18 15:58:21 2019 +0800

[FLINK-13256] Ensure periodical checkpointing continues when regional 
failover aborts pending checkpoints

This closes #9128.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 139 ---
 .../executiongraph/failover/FailoverRegion.java|   2 +-
 .../FailoverStrategyCheckpointCoordinatorTest.java | 186 +
 3 files changed, 264 insertions(+), 63 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3dc5c1d..682685c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -478,32 +478,9 @@ public class CheckpointCoordinator {
throw new 
CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
}
 
-   // if too many checkpoints are currently in 
progress, we need to mark that a request is queued
-   if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
-   triggerRequestQueued = true;
-   if (currentPeriodicTrigger != null) {
-   
currentPeriodicTrigger.cancel(false);
-   currentPeriodicTrigger = null;
-   }
-   throw new 
CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
-   }
+   checkConcurrentCheckpoints();
 
-   // make sure the minimum interval between 
checkpoints has passed
-   final long earliestNext = 
lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
-   final long durationTillNextMillis = 
(earliestNext - System.nanoTime()) / 1_000_000;
-
-   if (durationTillNextMillis > 0) {
-   if (currentPeriodicTrigger != null) {
-   
currentPeriodicTrigger.cancel(false);
-   currentPeriodicTrigger = null;
-   }
-   // Reassign the new trigger to the 
currentPeriodicTrigger
-   currentPeriodicTrigger = 
timer.scheduleAtFixedRate(
-   new ScheduledTrigger(),
-   durationTillNextMillis, 
baseInterval, TimeUnit.MILLISECONDS);
-
-   throw new 
CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
-   }
+   checkMinPauseBetweenCheckpoints();
}
}
 
@@ -623,32 +600,9 @@ public class CheckpointCoordinator {
throw new 
CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
}
 
-   if (pendingCheckpoints.size() 
>= maxConcurrentCheckpointAttempts) {
-   triggerRequestQueued = 
true;
-   if 
(currentPeriodicTrigger != null) {
-   
currentPeriodicTrigger.cancel(false);
-   
currentPeriodicTrigger = null;
-   }
-   throw new 
CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
-   }
-
-   // make sure the minimum 
interval between checkpoints has passed
-   final long earliestNext = 
lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
-  

[flink] branch release-1.9 updated: [FLINK-13256] Ensure periodical checkpointing continues when regional failover aborts pending checkpoints

2019-07-18 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new b7bfafc  [FLINK-13256] Ensure periodical checkpointing continues when 
regional failover aborts pending checkpoints
b7bfafc is described below

commit b7bfafca14cd6995a6a59d68d14b4bc3cf91cfb5
Author: Yun Tang 
AuthorDate: Thu Jul 18 09:58:21 2019 +0200

[FLINK-13256] Ensure periodical checkpointing continues when regional 
failover aborts pending checkpoints

This closes #9128.

(cherry picked from commit 1ec34249a0303ae64d049d177057ef9b6c413ab5)
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 139 ---
 .../executiongraph/failover/FailoverRegion.java|   2 +-
 .../FailoverStrategyCheckpointCoordinatorTest.java | 186 +
 3 files changed, 264 insertions(+), 63 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3dc5c1d..682685c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -478,32 +478,9 @@ public class CheckpointCoordinator {
throw new 
CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
}
 
-   // if too many checkpoints are currently in 
progress, we need to mark that a request is queued
-   if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
-   triggerRequestQueued = true;
-   if (currentPeriodicTrigger != null) {
-   
currentPeriodicTrigger.cancel(false);
-   currentPeriodicTrigger = null;
-   }
-   throw new 
CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
-   }
+   checkConcurrentCheckpoints();
 
-   // make sure the minimum interval between 
checkpoints has passed
-   final long earliestNext = 
lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
-   final long durationTillNextMillis = 
(earliestNext - System.nanoTime()) / 1_000_000;
-
-   if (durationTillNextMillis > 0) {
-   if (currentPeriodicTrigger != null) {
-   
currentPeriodicTrigger.cancel(false);
-   currentPeriodicTrigger = null;
-   }
-   // Reassign the new trigger to the 
currentPeriodicTrigger
-   currentPeriodicTrigger = 
timer.scheduleAtFixedRate(
-   new ScheduledTrigger(),
-   durationTillNextMillis, 
baseInterval, TimeUnit.MILLISECONDS);
-
-   throw new 
CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
-   }
+   checkMinPauseBetweenCheckpoints();
}
}
 
@@ -623,32 +600,9 @@ public class CheckpointCoordinator {
throw new 
CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
}
 
-   if (pendingCheckpoints.size() 
>= maxConcurrentCheckpointAttempts) {
-   triggerRequestQueued = 
true;
-   if 
(currentPeriodicTrigger != null) {
-   
currentPeriodicTrigger.cancel(false);
-   
currentPeriodicTrigger = null;
-   }
-   throw new 
CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
-   }
-
-   // make sure the minimum 
interval between checkpoints has passed
-   final long earliestNext = 

[flink] branch release-1.9 updated (b7bfafc -> 609158b)

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


from b7bfafc  [FLINK-13256] Ensure periodical checkpointing continues when 
regional failover aborts pending checkpoints
 add 609158b  [FLINK-13316][legal] Update binary licensing

No new revisions were added by this update.

Summary of changes:
 NOTICE-binary  | 17280 +++
 .../licenses => licenses-binary}/LICENSE.@angular  | 0
 .../LICENSE.ant-design-palettes| 0
 licenses-binary/LICENSE.arpack_combined_all| 8 -
 licenses-binary/{LICENSE.asm.txt => LICENSE.asm}   | 0
 licenses-binary/LICENSE.cddlv1.0   |   129 -
 licenses-binary/LICENSE.cddlv1.1   |   348 -
 licenses-binary/LICENSE.core   |49 -
 .../licenses => licenses-binary}/LICENSE.core-js   | 0
 .../licenses => licenses-binary}/LICENSE.influx| 0
 licenses-binary/LICENSE.jsch   |30 -
 licenses-binary/LICENSE.jtransforms|   480 -
 licenses-binary/LICENSE.machinist  |19 -
 .../LICENSE.monaco-editor  | 0
 .../LICENSE.ng-zorro-antd  | 0
 licenses-binary/LICENSE.paranamer  |28 -
 .../licenses => licenses-binary}/LICENSE.rxjs  | 0
 licenses-binary/LICENSE.spire  |19 -
 .../LICENSE.tinycolor2 | 0
 .../licenses => licenses-binary}/LICENSE.tslib | 0
 licenses-binary/LICENSE.xmlenc |27 -
 .../licenses => licenses-binary}/LICENSE.zone  | 0
 22 files changed, 13937 insertions(+), 4480 deletions(-)
 copy {flink-runtime-web/src/main/resources/META-INF/licenses => 
licenses-binary}/LICENSE.@angular (100%)
 copy {flink-runtime-web/src/main/resources/META-INF/licenses => 
licenses-binary}/LICENSE.ant-design-palettes (100%)
 delete mode 100644 licenses-binary/LICENSE.arpack_combined_all
 rename licenses-binary/{LICENSE.asm.txt => LICENSE.asm} (100%)
 delete mode 100644 licenses-binary/LICENSE.cddlv1.0
 delete mode 100644 licenses-binary/LICENSE.cddlv1.1
 delete mode 100644 licenses-binary/LICENSE.core
 copy {flink-runtime-web/src/main/resources/META-INF/licenses => 
licenses-binary}/LICENSE.core-js (100%)
 copy 
{flink-metrics/flink-metrics-influxdb/src/main/resources/META-INF/licenses => 
licenses-binary}/LICENSE.influx (100%)
 delete mode 100644 licenses-binary/LICENSE.jsch
 delete mode 100644 licenses-binary/LICENSE.jtransforms
 delete mode 100644 licenses-binary/LICENSE.machinist
 copy {flink-runtime-web/src/main/resources/META-INF/licenses => 
licenses-binary}/LICENSE.monaco-editor (100%)
 copy {flink-runtime-web/src/main/resources/META-INF/licenses => 
licenses-binary}/LICENSE.ng-zorro-antd (100%)
 delete mode 100644 licenses-binary/LICENSE.paranamer
 copy {flink-runtime-web/src/main/resources/META-INF/licenses => 
licenses-binary}/LICENSE.rxjs (100%)
 delete mode 100644 licenses-binary/LICENSE.spire
 copy {flink-runtime-web/src/main/resources/META-INF/licenses => 
licenses-binary}/LICENSE.tinycolor2 (100%)
 copy {flink-runtime-web/src/main/resources/META-INF/licenses => 
licenses-binary}/LICENSE.tslib (100%)
 delete mode 100644 licenses-binary/LICENSE.xmlenc
 copy {flink-runtime-web/src/main/resources/META-INF/licenses => 
licenses-binary}/LICENSE.zone (100%)



[flink] branch master updated: [FLINK-13249][runtime] Fix handling of partition producer responses b… (#9138)

2019-07-18 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 23bd23b  [FLINK-13249][runtime] Fix handling of partition producer 
responses b… (#9138)
23bd23b is described below

commit 23bd23b325f15c726fcc48c76eb252fa2ed2fe59
Author: Stefan Richter 
AuthorDate: Thu Jul 18 10:37:22 2019 +0200

[FLINK-13249][runtime] Fix handling of partition producer responses b… 
(#9138)

* [FLINK-13249][runtime] Fix handling of partition producer responses by 
running them with the task's executor

* Review comments
---
 .../partition/PartitionProducerStateProvider.java|  9 +
 .../io/network/partition/consumer/SingleInputGate.java   |  6 +++---
 .../java/org/apache/flink/runtime/taskmanager/Task.java  | 16 ++--
 .../partition/consumer/SingleInputGateBuilder.java   |  6 +-
 .../org/apache/flink/runtime/taskmanager/TaskTest.java   |  8 
 5 files changed, 23 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
index 8bbdaa5..5785095 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.types.Either;
 
-import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 /**
  * Request execution state of partition producer, the response accepts state 
check callbacks.
@@ -34,11 +34,12 @@ public interface PartitionProducerStateProvider {
 * @param intermediateDataSetId ID of the parent intermediate data set.
 * @param resultPartitionId ID of the result partition to check. This
 * identifies the producing execution and partition.
-* @return a future with response handle.
+* @param responseConsumer consumer for the response handle.
 */
-   CompletableFuture 
requestPartitionProducerState(
+   void requestPartitionProducerState(
IntermediateDataSetID intermediateDataSetId,
-   ResultPartitionID resultPartitionId);
+   ResultPartitionID resultPartitionId,
+   Consumer responseConsumer);
 
/**
 * Result of state query, accepts state check callbacks.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index bd75262..534078d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -601,8 +601,8 @@ public class SingleInputGate extends InputGate {
void triggerPartitionStateCheck(ResultPartitionID partitionId) {
partitionProducerStateProvider.requestPartitionProducerState(
consumedResultId,
-   partitionId)
-   .thenAccept(responseHandle -> {
+   partitionId,
+   ((PartitionProducerStateProvider.ResponseHandle 
responseHandle) -> {
boolean isProducingState = new 
RemoteChannelStateChecker(partitionId, owningTaskName)

.isProducerReadyOrAbortConsumption(responseHandle);
if (isProducingState) {
@@ -612,7 +612,7 @@ public class SingleInputGate extends InputGate {

responseHandle.failConsumption(t);
}
}
-   });
+   }));
}
 
private void queueChannel(InputChannel channel) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d4e1d8a..12049f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -99,6 +99,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

[flink] branch release-1.9 updated: [FLINK-13249][runtime] Fix handling of partition producer responses by running them with the task's executor

2019-07-18 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 3eff638  [FLINK-13249][runtime] Fix handling of partition producer 
responses by running them with the task's executor
3eff638 is described below

commit 3eff6387b5f6716dee5c17b71b10c08760b946cc
Author: Stefan Richter 
AuthorDate: Thu Jul 18 10:37:22 2019 +0200

[FLINK-13249][runtime] Fix handling of partition producer responses by 
running them with the task's executor

Fixes the problem in FLINK-13249 by ensuring that processing the partition 
producer response is not blocking any netty thread, but is always executed by 
the task's executor.

(cherry picked from commit 23bd23b325f15c726fcc48c76eb252fa2ed2fe59)
---
 .../partition/PartitionProducerStateProvider.java|  9 +
 .../io/network/partition/consumer/SingleInputGate.java   |  6 +++---
 .../java/org/apache/flink/runtime/taskmanager/Task.java  | 16 ++--
 .../partition/consumer/SingleInputGateBuilder.java   |  6 +-
 .../org/apache/flink/runtime/taskmanager/TaskTest.java   |  8 
 5 files changed, 23 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
index 8bbdaa5..5785095 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionProducerStateProvider.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.types.Either;
 
-import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 /**
  * Request execution state of partition producer, the response accepts state 
check callbacks.
@@ -34,11 +34,12 @@ public interface PartitionProducerStateProvider {
 * @param intermediateDataSetId ID of the parent intermediate data set.
 * @param resultPartitionId ID of the result partition to check. This
 * identifies the producing execution and partition.
-* @return a future with response handle.
+* @param responseConsumer consumer for the response handle.
 */
-   CompletableFuture 
requestPartitionProducerState(
+   void requestPartitionProducerState(
IntermediateDataSetID intermediateDataSetId,
-   ResultPartitionID resultPartitionId);
+   ResultPartitionID resultPartitionId,
+   Consumer responseConsumer);
 
/**
 * Result of state query, accepts state check callbacks.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index bd75262..534078d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -601,8 +601,8 @@ public class SingleInputGate extends InputGate {
void triggerPartitionStateCheck(ResultPartitionID partitionId) {
partitionProducerStateProvider.requestPartitionProducerState(
consumedResultId,
-   partitionId)
-   .thenAccept(responseHandle -> {
+   partitionId,
+   ((PartitionProducerStateProvider.ResponseHandle 
responseHandle) -> {
boolean isProducingState = new 
RemoteChannelStateChecker(partitionId, owningTaskName)

.isProducerReadyOrAbortConsumption(responseHandle);
if (isProducingState) {
@@ -612,7 +612,7 @@ public class SingleInputGate extends InputGate {

responseHandle.failConsumption(t);
}
}
-   });
+   }));
}
 
private void queueChannel(InputChannel channel) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index d4e1d8a..12049f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -99,6 +99,7 @@ import 

[flink] branch master updated: [FLINK-13281][tests] Improve test for aborting pending checkpoints on failover

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 59a4d2c  [FLINK-13281][tests] Improve test for aborting pending 
checkpoints on failover
59a4d2c is described below

commit 59a4d2c1b52f8cceb5e4e287ee3a94e129d505e7
Author: Yun Tang 
AuthorDate: Thu Jul 18 17:59:02 2019 +0800

[FLINK-13281][tests] Improve test for aborting pending checkpoints on 
failover
---
 ...egionStrategyNGAbortPendingCheckpointsTest.java | 47 +++---
 1 file changed, 32 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
index 899d490..acfb396 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
@@ -30,13 +30,14 @@ import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
-import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.TestLogger;
@@ -51,8 +52,8 @@ import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -76,27 +77,41 @@ public class 
AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest
final ExecutionGraph executionGraph = 
createExecutionGraph(jobGraph);
 
final Iterator vertexIterator = 
executionGraph.getAllExecutionVertices().iterator();
-   final ExecutionVertex onlyExecutionVertex = 
vertexIterator.next();
+   final ExecutionVertex firstExecutionVertex = 
vertexIterator.next();
 
-   setTaskRunning(executionGraph, onlyExecutionVertex);
+   setTasksRunning(executionGraph, firstExecutionVertex, 
vertexIterator.next());
 
final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
checkState(checkpointCoordinator != null);
 

checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(),  false);
-   final int pendingCheckpointsBeforeFailure = 
checkpointCoordinator.getNumberOfPendingCheckpoints();
+   assertEquals(1, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+   long checkpointId = 
checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
 
-   failVertex(onlyExecutionVertex);
+   AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
+   jobGraph.getJobID(),
+   
firstExecutionVertex.getCurrentExecutionAttempt().getAttemptId(),
+   checkpointId);
+
+   // let the first vertex acknowledge the checkpoint, and fail it 
afterwards
+   // the failover strategy should then cancel all pending 
checkpoints on restart
+   
checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown 
location");
+   assertEquals(1, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+   failVertex(firstExecutionVertex);
+   assertEquals(1, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+   manualMainThreadExecutor.triggerScheduledTasks();
 
-   assertThat(pendingCheckpointsBeforeFailure, 

[flink] branch release-1.9 updated: [FLINK-13281][tests] Improve test for aborting pending checkpoints on failover

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new ca837bd  [FLINK-13281][tests] Improve test for aborting pending 
checkpoints on failover
ca837bd is described below

commit ca837bd8d467975df9101f6ea5c19cb48cef5ac5
Author: Yun Tang 
AuthorDate: Thu Jul 18 17:59:02 2019 +0800

[FLINK-13281][tests] Improve test for aborting pending checkpoints on 
failover
---
 ...egionStrategyNGAbortPendingCheckpointsTest.java | 47 +++---
 1 file changed, 32 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
index 899d490..acfb396 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
@@ -30,13 +30,14 @@ import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
-import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.TestLogger;
@@ -51,8 +52,8 @@ import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -76,27 +77,41 @@ public class 
AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest
final ExecutionGraph executionGraph = 
createExecutionGraph(jobGraph);
 
final Iterator vertexIterator = 
executionGraph.getAllExecutionVertices().iterator();
-   final ExecutionVertex onlyExecutionVertex = 
vertexIterator.next();
+   final ExecutionVertex firstExecutionVertex = 
vertexIterator.next();
 
-   setTaskRunning(executionGraph, onlyExecutionVertex);
+   setTasksRunning(executionGraph, firstExecutionVertex, 
vertexIterator.next());
 
final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
checkState(checkpointCoordinator != null);
 

checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(),  false);
-   final int pendingCheckpointsBeforeFailure = 
checkpointCoordinator.getNumberOfPendingCheckpoints();
+   assertEquals(1, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+   long checkpointId = 
checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
 
-   failVertex(onlyExecutionVertex);
+   AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
+   jobGraph.getJobID(),
+   
firstExecutionVertex.getCurrentExecutionAttempt().getAttemptId(),
+   checkpointId);
+
+   // let the first vertex acknowledge the checkpoint, and fail it 
afterwards
+   // the failover strategy should then cancel all pending 
checkpoints on restart
+   
checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown 
location");
+   assertEquals(1, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+   failVertex(firstExecutionVertex);
+   assertEquals(1, 
checkpointCoordinator.getNumberOfPendingCheckpoints());
+   manualMainThreadExecutor.triggerScheduledTasks();
 
-   assertThat(pendingCheckpointsBeforeFailure, 

[flink] branch master updated (59a4d2c -> c52f14b)

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 59a4d2c  [FLINK-13281][tests] Improve test for aborting pending 
checkpoints on failover
 add 33e8400  [FLINK-13320][catalog][docs] Remove broken link
 add c52f14b  [FLINK-13319][catalog][docs] Add chinese version

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/catalog.md| 4 
 docs/dev/table/{catalog.md => catalog.zh.md} | 4 
 2 files changed, 8 deletions(-)
 copy docs/dev/table/{catalog.md => catalog.zh.md} (98%)



[flink] 01/02: [FLINK-13320][catalog][docs] Remove broken link

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc078d6f0a23c58bd3f930567e9bbcce59f2a408
Author: Chesnay Schepler 
AuthorDate: Thu Jul 18 12:13:26 2019 +0200

[FLINK-13320][catalog][docs] Remove broken link
---
 docs/dev/table/catalog.md | 4 
 1 file changed, 4 deletions(-)

diff --git a/docs/dev/table/catalog.md b/docs/dev/table/catalog.md
index 27eab7c..c4a29cc 100644
--- a/docs/dev/table/catalog.md
+++ b/docs/dev/table/catalog.md
@@ -194,10 +194,6 @@ The following limitations in Hive's data types impact the 
mapping between Flink
 
 \** maximum length is 65535
 
-## Hive Compatibility
-
-For Hive compatibility and versions, see [Hive Compatibility]({{ site.baseurl 
}}/dev/batch/hive_compatibility.html)
-
 
 Catalog Registration
 



[flink] 02/02: [FLINK-13319][catalog][docs] Add chinese version

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cbf010f1485b10f101ad8ed56dd32665d3dfd7b7
Author: Chesnay Schepler 
AuthorDate: Thu Jul 18 12:13:44 2019 +0200

[FLINK-13319][catalog][docs] Add chinese version
---
 docs/dev/table/catalog.zh.md | 363 +++
 1 file changed, 363 insertions(+)

diff --git a/docs/dev/table/catalog.zh.md b/docs/dev/table/catalog.zh.md
new file mode 100644
index 000..c4a29cc
--- /dev/null
+++ b/docs/dev/table/catalog.zh.md
@@ -0,0 +1,363 @@
+---
+title: "Catalog"
+is_beta: true
+nav-parent_id: tableapi
+nav-pos: 100
+---
+
+
+Catalogs provide metadata, such as names, schemas, statistics of tables, and 
information about how to access data stored in a database or other external 
systems. Once a catalog is registered within a `TableEnvironment`, all its 
meta-objects are accessible from the Table API and SQL queries.
+
+
+* This will be replaced by the TOC
+{:toc}
+
+
+Catalog Interface
+-
+
+APIs are defined in `Catalog` interface. The interface defines a set of APIs 
to read and write catalog meta-objects such as database, tables, partitions, 
views, and functions.
+
+
+Catalog Meta-Objects Naming Structure
+-
+
+Flink's catalogs use a strict two-level structure, that is, catalogs contain 
databases, and databases contain meta-objects. Thus, the full name of a 
meta-object is always structured as `catalogName`.`databaseName`.`objectName`.
+
+Each `TableEnvironment` has a `CatalogManager` to manager all registered 
catalogs. To ease access to meta-objects, `CatalogManager` has a concept of 
current catalog and current database. By setting current catalog and current 
database, users can use just the meta-object's name in their queries. This 
greatly simplifies user experience.
+
+For example, a previous query as
+
+```sql
+select * from mycatalog.mydb.myTable;
+```
+
+can be shortened to
+
+```sql
+select * from myTable;
+```
+
+To querying tables in a different database under the current catalog, users 
don't need to specify the catalog name. In our example, it would be
+
+```
+select * from mydb2.myTable2
+```
+
+`CatalogManager` always has a built-in `GenericInMemoryCatalog` named 
`default_catalog`, which has a built-in default database named 
`default_database`. If no other catalog and database are explicitly set, they 
will be the current catalog and current database by default. All temp 
meta-objects, such as those defined by `TableEnvironment#registerTable`  are 
registered to this catalog. 
+
+Users can set current catalog and database via 
`TableEnvironment.useCatalog(...)` and `TableEnvironment.useDatabase(...)` in 
Table API, or `USE CATALOG ...` and `USE DATABASE ...` in Flink SQL.
+
+
+Catalog Types
+-
+
+## GenericInMemoryCatalog
+
+The default catalog; all meta-objects in this catalog are stored in memory, 
and be will be lost once the session shuts down.
+
+Its config entry value in SQL CLI yaml file is "generic_in_memory".
+
+## HiveCatalog
+
+Flink's `HiveCatalog` can read and write both Flink and Hive meta-objects 
using Hive Metastore as persistent storage.
+
+Its config entry value in SQL CLI yaml file is "hive".
+
+### Persist Flink meta-objects
+
+Historically, Flink meta-objects are only stored in memory and are per session 
based. That means users have to recreate all the meta-objects every time they 
start a new session.
+
+To maintain meta-objects across sessions, users can choose to use 
`HiveCatalog` to persist all of users' Flink streaming (unbounded-stream) and 
batch (bounded-stream) meta-objects. Because Hive Metastore is only used for 
storage, Hive itself may not understand Flink's meta-objects stored in the 
metastore.
+
+### Integrate Flink with Hive metadata
+
+The ultimate goal for integrating Flink with Hive metadata is that:
+
+1. Existing meta-objects, like tables, views, and functions, created by Hive 
or other Hive-compatible applications can be used by Flink
+
+2. Meta-objects created by `HiveCatalog` can be written back to Hive metastore 
such that Hive and other Hive-compatible applications can consume.
+
+## User-configured Catalog
+
+Catalogs are pluggable. Users can develop custom catalogs by implementing the 
`Catalog` interface, which defines a set of APIs for reading and writing 
catalog meta-objects such as database, tables, partitions, views, and functions.
+
+
+HiveCatalog
+---
+
+## Supported Hive Versions
+
+Flink's `HiveCatalog` officially supports Hive 2.3.4 and 1.2.1.
+
+The Hive version is explicitly specified as a String, either by passing it to 
the constructor when creating `HiveCatalog` instances directly in Table API or 
specifying it in yaml config file in SQL CLI. The Hive version string are 
`2.3.4` and `1.2.1`.
+
+## Case Insensitive to Meta-Object Names
+
+Note 

[flink] branch release-1.9 updated (ca837bd -> cbf010f)

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ca837bd  [FLINK-13281][tests] Improve test for aborting pending 
checkpoints on failover
 new fc078d6  [FLINK-13320][catalog][docs] Remove broken link
 new cbf010f  [FLINK-13319][catalog][docs] Add chinese version

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/table/catalog.md| 4 
 docs/dev/table/{catalog.md => catalog.zh.md} | 4 
 2 files changed, 8 deletions(-)
 copy docs/dev/table/{catalog.md => catalog.zh.md} (98%)



[flink] branch release-1.9 updated (cbf010f -> 41e56d9)

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


from cbf010f  [FLINK-13319][catalog][docs] Add chinese version
 add 5828f67  [FLINK-12578][build] Use secure MapR repository by default
 add 41e56d9  [FLINK-12578][build] Add fallback unsafe MapR repository

No new revisions were added by this update.

Summary of changes:
 flink-filesystems/flink-mapr-fs/pom.xml | 22 +-
 pom.xml |  2 +-
 tools/travis_watchdog.sh|  4 +++-
 3 files changed, 25 insertions(+), 3 deletions(-)



[flink] branch master updated (c52f14b -> 07ce142)

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c52f14b  [FLINK-13319][catalog][docs] Add chinese version
 new 5c36c65  [FLINK-12578][build] Use secure MapR repository by default
 new 07ce142  [FLINK-12578][build] Add fallback unsafe MapR repository

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-filesystems/flink-mapr-fs/pom.xml | 22 +-
 pom.xml |  2 +-
 tools/travis_watchdog.sh|  4 +++-
 3 files changed, 25 insertions(+), 3 deletions(-)



[flink] 01/02: [FLINK-12578][build] Use secure MapR repository by default

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c36c650e6520d92191ce2da33f7dcae774319f6
Author: Chesnay Schepler 
AuthorDate: Thu Jul 18 13:00:50 2019 +0200

[FLINK-12578][build] Use secure MapR repository by default
---
 flink-filesystems/flink-mapr-fs/pom.xml | 2 +-
 pom.xml | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-filesystems/flink-mapr-fs/pom.xml 
b/flink-filesystems/flink-mapr-fs/pom.xml
index 74b9170..5238423 100644
--- a/flink-filesystems/flink-mapr-fs/pom.xml
+++ b/flink-filesystems/flink-mapr-fs/pom.xml
@@ -35,7 +35,7 @@ under the License.


mapr-releases
-   http://repository.mapr.com/maven/
+   https://repository.mapr.com/maven/
false
true

diff --git a/pom.xml b/pom.xml
index 1911d85..3f7cd8b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1006,7 +1006,7 @@ under the License.


mapr-releases
-   
http://repository.mapr.com/maven/
+   
https://repository.mapr.com/maven/

false

true




[flink] 02/02: [FLINK-12578][build] Add fallback unsafe MapR repository

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07ce142259a08eaec2e044c31d5beaa7b19ae6b6
Author: Chesnay Schepler 
AuthorDate: Thu Jul 18 13:01:11 2019 +0200

[FLINK-12578][build] Add fallback unsafe MapR repository
---
 flink-filesystems/flink-mapr-fs/pom.xml | 20 
 tools/travis_watchdog.sh|  4 +++-
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/flink-filesystems/flink-mapr-fs/pom.xml 
b/flink-filesystems/flink-mapr-fs/pom.xml
index 5238423..e32954c 100644
--- a/flink-filesystems/flink-mapr-fs/pom.xml
+++ b/flink-filesystems/flink-mapr-fs/pom.xml
@@ -41,6 +41,26 @@ under the License.


 
+   
+   
+   unsafe-mapr-repo
+   
+   
+   unsafe-mapr-repo
+   
+   
+   
+   
+   
+   mapr-releases
+   
http://repository.mapr.com/maven/
+   
false
+   
true
+   
+   
+   
+   
+

 

diff --git a/tools/travis_watchdog.sh b/tools/travis_watchdog.sh
index b6214fa..b9b2b7d 100755
--- a/tools/travis_watchdog.sh
+++ b/tools/travis_watchdog.sh
@@ -59,8 +59,10 @@ MVN_TEST_MODULES=$(get_test_modules_for_stage ${TEST})
 #
 # -nsu option forbids downloading snapshot artifacts. The only snapshot 
artifacts we depend are from
 # Flink, which however should all be built locally. see FLINK-7230
+#
+# We use -Punsafe-mapr-repo since the https version fails on Travis for some 
reason.
 MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} 
-Dlog4j.configuration=file://$LOG4J_PROPERTIES 
-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
-MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 
-Dfast -B -Pskip-webui-build $MVN_LOGGING_OPTIONS"
+MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 
-Dfast -B -Pskip-webui-build -Punsafe-mapr-repo $MVN_LOGGING_OPTIONS"
 MVN_COMPILE_OPTIONS="-DskipTests"
 MVN_TEST_OPTIONS="$MVN_LOGGING_OPTIONS -Dflink.tests.with-openssl"
 



[flink] 02/02: [FLINK-12578][build] Add fallback unsafe MapR repository

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 148e0dde71786821e469455405b3bf51052498ed
Author: Chesnay Schepler 
AuthorDate: Thu Jul 18 13:01:11 2019 +0200

[FLINK-12578][build] Add fallback unsafe MapR repository
---
 flink-filesystems/flink-mapr-fs/pom.xml | 20 
 tools/travis_mvn_watchdog.sh|  4 +++-
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/flink-filesystems/flink-mapr-fs/pom.xml 
b/flink-filesystems/flink-mapr-fs/pom.xml
index 84aecae..2683a8e 100644
--- a/flink-filesystems/flink-mapr-fs/pom.xml
+++ b/flink-filesystems/flink-mapr-fs/pom.xml
@@ -41,6 +41,26 @@ under the License.


 
+   
+   
+   unsafe-mapr-repo
+   
+   
+   unsafe-mapr-repo
+   
+   
+   
+   
+   
+   mapr-releases
+   
http://repository.mapr.com/maven/
+   
false
+   
true
+   
+   
+   
+   
+

 

diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index 444e819..00d7b77 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -54,8 +54,10 @@ MVN_TEST_MODULES=$(get_test_modules_for_stage ${TEST})
 #
 # -nsu option forbids downloading snapshot artifacts. The only snapshot 
artifacts we depend are from
 # Flink, which however should all be built locally. see FLINK-7230
+#
+# We use -Punsafe-mapr-repo since the https version fails on Travis for some 
reason.
 MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} 
-Dlog4j.configuration=file://$LOG4J_PROPERTIES 
-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
-MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 
-Dfast -B $MVN_LOGGING_OPTIONS"
+MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 
-Dfast -B -Punsafe-mapr-repo $MVN_LOGGING_OPTIONS"
 MVN_COMPILE_OPTIONS="-DskipTests"
 MVN_TEST_OPTIONS="$MVN_LOGGING_OPTIONS"
 



[flink] 01/02: [FLINK-12578][build] Use secure MapR repository by default

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fe80cb57296e8a295fc76188c5cf002444c754b8
Author: Chesnay Schepler 
AuthorDate: Thu Jul 18 13:00:50 2019 +0200

[FLINK-12578][build] Use secure MapR repository by default
---
 flink-filesystems/flink-mapr-fs/pom.xml | 2 +-
 pom.xml | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-filesystems/flink-mapr-fs/pom.xml 
b/flink-filesystems/flink-mapr-fs/pom.xml
index 48d02a4..84aecae 100644
--- a/flink-filesystems/flink-mapr-fs/pom.xml
+++ b/flink-filesystems/flink-mapr-fs/pom.xml
@@ -35,7 +35,7 @@ under the License.


mapr-releases
-   http://repository.mapr.com/maven/
+   https://repository.mapr.com/maven/
false
true

diff --git a/pom.xml b/pom.xml
index 21f4265..95cedeb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -988,7 +988,7 @@ under the License.


mapr-releases
-   
http://repository.mapr.com/maven/
+   
https://repository.mapr.com/maven/

false

true




[flink] branch release-1.8 updated (54c44eb -> 148e0dd)

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 54c44eb  [hotfix][tests][coordination] Move idle task manager release 
tests into a separate suite
 new fe80cb5  [FLINK-12578][build] Use secure MapR repository by default
 new 148e0dd  [FLINK-12578][build] Add fallback unsafe MapR repository

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-filesystems/flink-mapr-fs/pom.xml | 22 +-
 pom.xml |  2 +-
 tools/travis_mvn_watchdog.sh|  4 +++-
 3 files changed, 25 insertions(+), 3 deletions(-)



[flink] 02/02: [FLINK-12578][build] Add fallback unsafe MapR repository

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f092d755b9a3046c14abf80aa1bb3f1347485d3c
Author: Chesnay Schepler 
AuthorDate: Thu Jul 18 13:01:11 2019 +0200

[FLINK-12578][build] Add fallback unsafe MapR repository
---
 flink-filesystems/flink-mapr-fs/pom.xml | 20 
 tools/travis_mvn_watchdog.sh|  4 +++-
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/flink-filesystems/flink-mapr-fs/pom.xml 
b/flink-filesystems/flink-mapr-fs/pom.xml
index 036bad0..1c2117c 100644
--- a/flink-filesystems/flink-mapr-fs/pom.xml
+++ b/flink-filesystems/flink-mapr-fs/pom.xml
@@ -41,6 +41,26 @@ under the License.


 
+   
+   
+   unsafe-mapr-repo
+   
+   
+   unsafe-mapr-repo
+   
+   
+   
+   
+   
+   mapr-releases
+   
http://repository.mapr.com/maven/
+   
false
+   
true
+   
+   
+   
+   
+

 

diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index 63c1772..b2c4369 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -54,8 +54,10 @@ MVN_TEST_MODULES=$(get_test_modules_for_stage ${TEST})
 #
 # -nsu option forbids downloading snapshot artifacts. The only snapshot 
artifacts we depend are from
 # Flink, which however should all be built locally. see FLINK-7230
+#
+# We use -Punsafe-mapr-repo since the https version fails on Travis for some 
reason.
 MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} 
-Dlog4j.configuration=file://$LOG4J_PROPERTIES 
-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
-MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 
-Dfast -B $MVN_LOGGING_OPTIONS"
+MVN_COMMON_OPTIONS="-nsu -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 
-Dfast -B -Punsafe-mapr-repo $MVN_LOGGING_OPTIONS"
 MVN_COMPILE_OPTIONS="-DskipTests"
 MVN_TEST_OPTIONS="$MVN_LOGGING_OPTIONS"
 



[flink] branch release-1.7 updated (3bcb515 -> f092d75)

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 3bcb515  [hotfix][tests][coordination] Move idle task manager release 
tests into a separate suite
 new 022dce5  [FLINK-12578][build] Use secure MapR repository by default
 new f092d75  [FLINK-12578][build] Add fallback unsafe MapR repository

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-filesystems/flink-mapr-fs/pom.xml | 22 +-
 pom.xml |  2 +-
 tools/travis_mvn_watchdog.sh|  4 +++-
 3 files changed, 25 insertions(+), 3 deletions(-)



[flink] 01/02: [FLINK-12578][build] Use secure MapR repository by default

2019-07-18 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 022dce55eada098e1de50aa44431750dc2370dff
Author: Chesnay Schepler 
AuthorDate: Thu Jul 18 13:00:50 2019 +0200

[FLINK-12578][build] Use secure MapR repository by default
---
 flink-filesystems/flink-mapr-fs/pom.xml | 2 +-
 pom.xml | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-filesystems/flink-mapr-fs/pom.xml 
b/flink-filesystems/flink-mapr-fs/pom.xml
index 1c4351d..036bad0 100644
--- a/flink-filesystems/flink-mapr-fs/pom.xml
+++ b/flink-filesystems/flink-mapr-fs/pom.xml
@@ -35,7 +35,7 @@ under the License.


mapr-releases
-   http://repository.mapr.com/maven/
+   https://repository.mapr.com/maven/
false
true

diff --git a/pom.xml b/pom.xml
index d93cdd0..c3aaf3d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -960,7 +960,7 @@ under the License.


mapr-releases
-   
http://repository.mapr.com/maven/
+   
https://repository.mapr.com/maven/

false

true




[flink] 02/04: [FLINK-13287][table-planner] Support Reinterpret cast call in blink planner

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2893f3f597ea3574636d0376bd7823b547fd2217
Author: JingsongLi 
AuthorDate: Wed Jul 17 20:20:22 2019 +0800

[FLINK-13287][table-planner] Support Reinterpret cast call in blink planner
---
 .../expressions/PlannerExpressionConverter.scala   |  8 
 .../flink/table/expressions/Reinterpret.scala  | 45 ++
 2 files changed, 53 insertions(+)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 208cad9..8b5dada 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -56,6 +56,14 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
   fromDataTypeToLegacyInfo(
 children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType))
 
+  case REINTERPRET_CAST =>
+assert(children.size == 3)
+Reinterpret(
+  children.head.accept(this),
+  fromDataTypeToLegacyInfo(
+children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType),
+  getValue[Boolean](children(2).accept(this)))
+
   case WINDOW_START =>
 assert(children.size == 1)
 val windowReference = translateWindowReference(children.head)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/Reinterpret.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/Reinterpret.scala
new file mode 100644
index 000..530fd3c
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/Reinterpret.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import 
org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.typeutils.TypeCoercion
+import org.apache.flink.table.validate._
+
+case class Reinterpret(child: PlannerExpression, resultType: 
TypeInformation[_],
+   checkOverflow: Boolean) extends UnaryExpression {
+
+  override def toString = s"$child.reinterpret($resultType)"
+
+  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+val child: PlannerExpression = anyRefs.head.asInstanceOf[PlannerExpression]
+copy(child, resultType).asInstanceOf[this.type]
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (TypeCoercion.canReinterpret(
+  fromTypeInfoToLogicalType(child.resultType), 
fromTypeInfoToLogicalType(resultType))) {
+  ValidationSuccess
+} else {
+  ValidationFailure(s"Unsupported reinterpret from ${child.resultType} to 
$resultType")
+}
+  }
+}
+



[flink] 03/04: [FLINK-13287][table-api] Port ExistingField to api-java and use new Expression in FieldComputer

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 514b51de832fec320caf9cc73eb39d35910a1c79
Author: JingsongLi 
AuthorDate: Wed Jul 17 20:23:37 2019 +0800

[FLINK-13287][table-api] Port ExistingField to api-java and use new 
Expression in FieldComputer
---
 .../table/sources/tsextractors/ExistingField.java  | 136 +
 .../table/expressions/ResolvedFieldReference.java  |  29 -
 .../expressions/ExestingFieldFieldReference.scala  |  26 
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../flink/table/sources/TableSourceUtil.scala  |  13 +-
 .../table/sources/tsextractors/ExistingField.scala | 111 -
 .../ExtendedAggregateExtractProjectRule.java   |   4 +-
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../DataStreamGroupWindowAggregateBase.scala   |   4 +-
 .../flink/table/sources/TableSourceUtil.scala  |  29 +++--
 .../table/sources/tsextractors/ExistingField.scala |  88 -
 .../flink/table/descriptors/RowtimeTest.scala  |  19 ++-
 .../table/utils/TestFilterableTableSource.scala|  22 ++--
 19 files changed, 230 insertions(+), 279 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
new file mode 100644
index 000..0e8d73f
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.types.DataType;
+
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.typeLiteral;
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Converts an existing {@link Long}, {@link java.sql.Timestamp}, or
+ * timestamp formatted java.lang.String field (e.g., "2018-05-28 
12:34:56.000") into
+ * a rowtime attribute.
+ */
+@PublicEvolving
+public final class ExistingField extends TimestampExtractor {
+
+   private static final long serialVersionUID = 1L;
+
+   private String field;
+
+   /**
+* @param field The field to convert into a rowtime attribute.
+*/
+   public ExistingField(String field) {
+   this.field = checkNotNull(field);
+   }
+
+   @Override
+   public String[] getArgumentFields() {
+   return new String[] {field};
+   }
+
+   @Override
+   public void validateArgumentFields(TypeInformation[] 
argumentFieldTypes) {
+   DataType fieldType = 
fromLegacyInfoToDataType(argumentFieldTypes[0]);
+
+   switch (fieldType.getLogicalType().getTypeRoot()) {
+   case BIGINT:
+

[flink] branch master updated (07ce142 -> 2e3aab3)

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 07ce142  [FLINK-12578][build] Add fallback unsafe MapR repository
 new 678fce0  [FLINK-13287][table-planner] Support STREAM_RECORD_TIMESTAMP 
call in table planner
 new 2893f3f  [FLINK-13287][table-planner] Support Reinterpret cast call in 
blink planner
 new 514b51d  [FLINK-13287][table-api] Port ExistingField to api-java and 
use new Expression in FieldComputer
 new 2e3aab3  [FLINK-13287][table-api] Port StreamRecordTimestamp to 
api-java

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/sources/tsextractors/ExistingField.java  | 136 +
 .../tsextractors/StreamRecordTimestamp.java|  75 
 .../table/expressions/ResolvedFieldReference.java  |  29 -
 .../flink/table/expressions/RexNodeConverter.java  |   2 +
 .../expressions/ExestingFieldFieldReference.scala  |  26 
 .../expressions/PlannerExpressionConverter.scala   |  12 ++
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../expressions/{cast.scala => Reinterpret.scala}  |  15 +--
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../flink/table/sources/TableSourceUtil.scala  |  13 +-
 .../table/sources/tsextractors/ExistingField.scala | 111 -
 .../ExtendedAggregateExtractProjectRule.java   |   4 +-
 .../expressions/PlannerExpressionConverter.scala   |   4 +
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../DataStreamGroupWindowAggregateBase.scala   |   4 +-
 .../flink/table/sources/TableSourceUtil.scala  |  29 +++--
 .../table/sources/tsextractors/ExistingField.scala |  88 -
 .../tsextractors/StreamRecordTimestamp.scala   |  67 --
 .../flink/table/descriptors/RowtimeTest.scala  |  19 ++-
 .../table/utils/TestFilterableTableSource.scala|  22 ++--
 25 files changed, 331 insertions(+), 353 deletions(-)
 create mode 100644 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
 create mode 100644 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.java
 delete mode 100644 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala
 copy 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/{cast.scala
 => Reinterpret.scala} (76%)
 delete mode 100644 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
 delete mode 100644 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
 delete mode 100644 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala



[flink] 04/04: [FLINK-13287][table-api] Port StreamRecordTimestamp to api-java

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2e3aab3f6464ecae7f60a561ae7541296ce5681c
Author: JingsongLi 
AuthorDate: Wed Jul 17 20:25:04 2019 +0800

[FLINK-13287][table-api] Port StreamRecordTimestamp to api-java

This closes #9129.
---
 .../tsextractors/StreamRecordTimestamp.java| 75 ++
 .../tsextractors/StreamRecordTimestamp.scala   | 67 ---
 2 files changed, 75 insertions(+), 67 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.java
new file mode 100644
index 000..fdb4384
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP;
+
+/**
+ * Extracts the timestamp of a StreamRecord into a rowtime attribute.
+ *
+ * Note: This extractor only works for StreamTableSources.
+ */
+@PublicEvolving
+public final class StreamRecordTimestamp extends TimestampExtractor {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final StreamRecordTimestamp INSTANCE = new 
StreamRecordTimestamp();
+
+   @Override
+   public String[] getArgumentFields() {
+   return new String[0];
+   }
+
+   @Override
+   public void validateArgumentFields(TypeInformation[] 
argumentFieldTypes) {
+   }
+
+   @Override
+   public Expression getExpression(ResolvedFieldReference[] fieldAccesses) 
{
+   return 
ApiExpressionUtils.unresolvedCall(STREAM_RECORD_TIMESTAMP);
+   }
+
+   @Override
+   public Map toProperties() {
+   Map map = new HashMap<>();
+   map.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, 
Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE);
+   return map;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   return this == o || o != null && getClass() == o.getClass();
+   }
+
+   @Override
+   public int hashCode() {
+   return this.getClass().hashCode();
+   }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
deleted file mode 100644
index 087b1a6..000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing 

[flink] 01/04: [FLINK-13287][table-planner] Support STREAM_RECORD_TIMESTAMP call in table planner

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 678fce06587a3df4693acbcc5f3a9fb32396aabe
Author: JingsongLi 
AuthorDate: Wed Jul 17 20:18:56 2019 +0800

[FLINK-13287][table-planner] Support STREAM_RECORD_TIMESTAMP call in table 
planner
---
 .../java/org/apache/flink/table/expressions/RexNodeConverter.java | 2 ++
 .../apache/flink/table/expressions/PlannerExpressionConverter.scala   | 4 
 .../apache/flink/table/expressions/PlannerExpressionConverter.scala   | 4 
 3 files changed, 10 insertions(+)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
index 2c6ef4d..5528571 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
@@ -289,6 +289,8 @@ public class RexNodeConverter implements 
ExpressionVisitor {
conversionsOfBuiltInFunc
.put(BuiltInFunctionDefinitions.SHA512, exprs 
-> convert(FlinkSqlOperatorTable.SHA512, exprs));
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SHA1, 
exprs -> convert(FlinkSqlOperatorTable.SHA1, exprs));
+   
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP,
+   exprs -> 
convert(FlinkSqlOperatorTable.STREAMRECORD_TIMESTAMP, exprs));
}
 
@Override
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index d52d6e6a..208cad9 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -683,6 +683,10 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
 assert(args.isEmpty)
 CurrentRow()
 
+  case STREAM_RECORD_TIMESTAMP =>
+assert(args.isEmpty)
+StreamRecordTimestamp()
+
   case _ =>
 throw new TableException(s"Unsupported function definition: $fd")
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 999fa56..5684594 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -682,6 +682,10 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
 assert(args.isEmpty)
 CurrentRow()
 
+  case STREAM_RECORD_TIMESTAMP =>
+assert(args.isEmpty)
+StreamRecordTimestamp()
+
   case _ =>
 throw new TableException(s"Unsupported function definition: $fd")
 }



[flink] branch release-1.9 updated (41e56d9 -> c43dc95)

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 41e56d9  [FLINK-12578][build] Add fallback unsafe MapR repository
 new 07e70a9  [FLINK-13287][table-planner] Support STREAM_RECORD_TIMESTAMP 
call in table planner
 new fa5c867  [FLINK-13287][table-planner] Support Reinterpret cast call in 
blink planner
 new 121e35e  [FLINK-13287][table-api] Port ExistingField to api-java and 
use new Expression in FieldComputer
 new c43dc95  [FLINK-13287][table-api] Port StreamRecordTimestamp to 
api-java

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/sources/tsextractors/ExistingField.java  | 136 +
 .../tsextractors/StreamRecordTimestamp.java|  75 
 .../table/expressions/ResolvedFieldReference.java  |  29 -
 .../flink/table/expressions/RexNodeConverter.java  |   2 +
 .../expressions/ExestingFieldFieldReference.scala  |  26 
 .../expressions/PlannerExpressionConverter.scala   |  12 ++
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../expressions/{cast.scala => Reinterpret.scala}  |  15 +--
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../flink/table/sources/TableSourceUtil.scala  |  13 +-
 .../table/sources/tsextractors/ExistingField.scala | 111 -
 .../ExtendedAggregateExtractProjectRule.java   |   4 +-
 .../expressions/PlannerExpressionConverter.scala   |   4 +
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../DataStreamGroupWindowAggregateBase.scala   |   4 +-
 .../flink/table/sources/TableSourceUtil.scala  |  29 +++--
 .../table/sources/tsextractors/ExistingField.scala |  88 -
 .../tsextractors/StreamRecordTimestamp.scala   |  67 --
 .../flink/table/descriptors/RowtimeTest.scala  |  19 ++-
 .../table/utils/TestFilterableTableSource.scala|  22 ++--
 25 files changed, 331 insertions(+), 353 deletions(-)
 create mode 100644 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
 create mode 100644 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.java
 delete mode 100644 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala
 copy 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/{cast.scala
 => Reinterpret.scala} (76%)
 delete mode 100644 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
 delete mode 100644 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
 delete mode 100644 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala



[flink] 01/04: [FLINK-13287][table-planner] Support STREAM_RECORD_TIMESTAMP call in table planner

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07e70a927ee36cd5eda85eece6b221086fe335ff
Author: JingsongLi 
AuthorDate: Wed Jul 17 20:18:56 2019 +0800

[FLINK-13287][table-planner] Support STREAM_RECORD_TIMESTAMP call in table 
planner
---
 .../java/org/apache/flink/table/expressions/RexNodeConverter.java | 2 ++
 .../apache/flink/table/expressions/PlannerExpressionConverter.scala   | 4 
 .../apache/flink/table/expressions/PlannerExpressionConverter.scala   | 4 
 3 files changed, 10 insertions(+)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
index 2c6ef4d..5528571 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
@@ -289,6 +289,8 @@ public class RexNodeConverter implements 
ExpressionVisitor {
conversionsOfBuiltInFunc
.put(BuiltInFunctionDefinitions.SHA512, exprs 
-> convert(FlinkSqlOperatorTable.SHA512, exprs));
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SHA1, 
exprs -> convert(FlinkSqlOperatorTable.SHA1, exprs));
+   
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP,
+   exprs -> 
convert(FlinkSqlOperatorTable.STREAMRECORD_TIMESTAMP, exprs));
}
 
@Override
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index d52d6e6a..208cad9 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -683,6 +683,10 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
 assert(args.isEmpty)
 CurrentRow()
 
+  case STREAM_RECORD_TIMESTAMP =>
+assert(args.isEmpty)
+StreamRecordTimestamp()
+
   case _ =>
 throw new TableException(s"Unsupported function definition: $fd")
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 999fa56..5684594 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -682,6 +682,10 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
 assert(args.isEmpty)
 CurrentRow()
 
+  case STREAM_RECORD_TIMESTAMP =>
+assert(args.isEmpty)
+StreamRecordTimestamp()
+
   case _ =>
 throw new TableException(s"Unsupported function definition: $fd")
 }



[flink] 02/04: [FLINK-13287][table-planner] Support Reinterpret cast call in blink planner

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fa5c8674a6a2bcdbcc7ed898e98a89e3b2f92c50
Author: JingsongLi 
AuthorDate: Wed Jul 17 20:20:22 2019 +0800

[FLINK-13287][table-planner] Support Reinterpret cast call in blink planner
---
 .../expressions/PlannerExpressionConverter.scala   |  8 
 .../flink/table/expressions/Reinterpret.scala  | 45 ++
 2 files changed, 53 insertions(+)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
index 208cad9..8b5dada 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionConverter.scala
@@ -56,6 +56,14 @@ class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExp
   fromDataTypeToLegacyInfo(
 children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType))
 
+  case REINTERPRET_CAST =>
+assert(children.size == 3)
+Reinterpret(
+  children.head.accept(this),
+  fromDataTypeToLegacyInfo(
+children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType),
+  getValue[Boolean](children(2).accept(this)))
+
   case WINDOW_START =>
 assert(children.size == 1)
 val windowReference = translateWindowReference(children.head)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/Reinterpret.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/Reinterpret.scala
new file mode 100644
index 000..530fd3c
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/Reinterpret.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import 
org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.typeutils.TypeCoercion
+import org.apache.flink.table.validate._
+
+case class Reinterpret(child: PlannerExpression, resultType: 
TypeInformation[_],
+   checkOverflow: Boolean) extends UnaryExpression {
+
+  override def toString = s"$child.reinterpret($resultType)"
+
+  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+val child: PlannerExpression = anyRefs.head.asInstanceOf[PlannerExpression]
+copy(child, resultType).asInstanceOf[this.type]
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (TypeCoercion.canReinterpret(
+  fromTypeInfoToLogicalType(child.resultType), 
fromTypeInfoToLogicalType(resultType))) {
+  ValidationSuccess
+} else {
+  ValidationFailure(s"Unsupported reinterpret from ${child.resultType} to 
$resultType")
+}
+  }
+}
+



[flink] 04/04: [FLINK-13287][table-api] Port StreamRecordTimestamp to api-java

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c43dc951bf52e2f76a9c17021cfdf1ca8bc54781
Author: JingsongLi 
AuthorDate: Wed Jul 17 20:25:04 2019 +0800

[FLINK-13287][table-api] Port StreamRecordTimestamp to api-java

This closes #9129.
---
 .../tsextractors/StreamRecordTimestamp.java| 75 ++
 .../tsextractors/StreamRecordTimestamp.scala   | 67 ---
 2 files changed, 75 insertions(+), 67 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.java
new file mode 100644
index 000..fdb4384
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP;
+
+/**
+ * Extracts the timestamp of a StreamRecord into a rowtime attribute.
+ *
+ * Note: This extractor only works for StreamTableSources.
+ */
+@PublicEvolving
+public final class StreamRecordTimestamp extends TimestampExtractor {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final StreamRecordTimestamp INSTANCE = new 
StreamRecordTimestamp();
+
+   @Override
+   public String[] getArgumentFields() {
+   return new String[0];
+   }
+
+   @Override
+   public void validateArgumentFields(TypeInformation[] 
argumentFieldTypes) {
+   }
+
+   @Override
+   public Expression getExpression(ResolvedFieldReference[] fieldAccesses) 
{
+   return 
ApiExpressionUtils.unresolvedCall(STREAM_RECORD_TIMESTAMP);
+   }
+
+   @Override
+   public Map toProperties() {
+   Map map = new HashMap<>();
+   map.put(Rowtime.ROWTIME_TIMESTAMPS_TYPE, 
Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE);
+   return map;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   return this == o || o != null && getClass() == o.getClass();
+   }
+
+   @Override
+   public int hashCode() {
+   return this.getClass().hashCode();
+   }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
deleted file mode 100644
index 087b1a6..000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/tsextractors/StreamRecordTimestamp.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing 

[flink] 03/04: [FLINK-13287][table-api] Port ExistingField to api-java and use new Expression in FieldComputer

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 121e35ec66dbaf40ca763e1a8551c480cea03215
Author: JingsongLi 
AuthorDate: Wed Jul 17 20:23:37 2019 +0800

[FLINK-13287][table-api] Port ExistingField to api-java and use new 
Expression in FieldComputer
---
 .../table/sources/tsextractors/ExistingField.java  | 136 +
 .../table/expressions/ResolvedFieldReference.java  |  29 -
 .../expressions/ExestingFieldFieldReference.scala  |  26 
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../flink/table/sources/TableSourceUtil.scala  |  13 +-
 .../table/sources/tsextractors/ExistingField.scala | 111 -
 .../ExtendedAggregateExtractProjectRule.java   |   4 +-
 .../table/expressions/PlannerExpressionUtils.scala |   6 +-
 .../org/apache/flink/table/expressions/call.scala  |   4 +-
 .../apache/flink/table/expressions/composite.scala |   2 +-
 .../flink/table/expressions/fieldExpression.scala  |   2 +-
 .../DataStreamGroupWindowAggregateBase.scala   |   4 +-
 .../flink/table/sources/TableSourceUtil.scala  |  29 +++--
 .../table/sources/tsextractors/ExistingField.scala |  88 -
 .../flink/table/descriptors/RowtimeTest.scala  |  19 ++-
 .../table/utils/TestFilterableTableSource.scala|  22 ++--
 19 files changed, 230 insertions(+), 279 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
new file mode 100644
index 000..0e8d73f
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/sources/tsextractors/ExistingField.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.tsextractors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.Rowtime;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.types.DataType;
+
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.typeLiteral;
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.unresolvedCall;
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Converts an existing {@link Long}, {@link java.sql.Timestamp}, or
+ * timestamp formatted java.lang.String field (e.g., "2018-05-28 
12:34:56.000") into
+ * a rowtime attribute.
+ */
+@PublicEvolving
+public final class ExistingField extends TimestampExtractor {
+
+   private static final long serialVersionUID = 1L;
+
+   private String field;
+
+   /**
+* @param field The field to convert into a rowtime attribute.
+*/
+   public ExistingField(String field) {
+   this.field = checkNotNull(field);
+   }
+
+   @Override
+   public String[] getArgumentFields() {
+   return new String[] {field};
+   }
+
+   @Override
+   public void validateArgumentFields(TypeInformation[] 
argumentFieldTypes) {
+   DataType fieldType = 
fromLegacyInfoToDataType(argumentFieldTypes[0]);
+
+   switch (fieldType.getLogicalType().getTypeRoot()) {
+   case BIGINT:
+   

[flink] branch master updated: [FLINK-13315][api-java] Port wmstrategies to api-java-bridge

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new de1a8a0  [FLINK-13315][api-java] Port wmstrategies to api-java-bridge
de1a8a0 is described below

commit de1a8a0444c231df6c57a70cefb689aa7126a502
Author: JingsongLi 
AuthorDate: Wed Jul 17 20:54:30 2019 +0800

[FLINK-13315][api-java] Port wmstrategies to api-java-bridge

This closes #9153.
---
 .../sources/wmstrategies/AscendingTimestamps.java  | 71 +
 .../wmstrategies/BoundedOutOfOrderTimestamps.java  | 88 ++
 .../wmstrategies/PeriodicWatermarkAssigner.java| 44 +++
 .../wmstrategies/PunctuatedWatermarkAssigner.java  | 40 ++
 .../sources/wmstrategies/PreserveWatermarks.java   | 53 +
 .../stream/StreamExecTableSourceScan.scala | 23 --
 .../sources/wmstrategies/AscendingTimestamps.scala | 49 
 .../sources/wmstrategies/watermarkStrategies.scala | 81 
 .../sources/wmstrategies/AscendingTimestamps.scala | 60 ---
 .../wmstrategies/BoundedOutOfOrderTimestamps.scala | 63 
 .../sources/wmstrategies/watermarkStrategies.scala | 80 
 .../table/dataformat/DataFormatConverters.java |  1 -
 12 files changed, 313 insertions(+), 340 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.java
new file mode 100644
index 000..4b85007
--- /dev/null
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.descriptors.Rowtime;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A watermark strategy for ascending rowtime attributes.
+ *
+ * Emits a watermark of the maximum observed timestamp so far minus 1.
+ * Rows that have a timestamp equal to the max timestamp are not late.
+ */
+@PublicEvolving
+public final class AscendingTimestamps extends PeriodicWatermarkAssigner {
+
+   private static final long serialVersionUID = 1L;
+
+   private long maxTimestamp = Long.MIN_VALUE + 1;
+
+   @Override
+   public void nextTimestamp(long timestamp) {
+   if (timestamp > maxTimestamp) {
+   maxTimestamp = timestamp;
+   }
+   }
+
+   @Override
+   public Map toProperties() {
+   Map map = new HashMap<>();
+   map.put(
+   Rowtime.ROWTIME_WATERMARKS_TYPE,
+   
Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING);
+   return map;
+   }
+
+   @Override
+   public int hashCode() {
+   return AscendingTimestamps.class.hashCode();
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof AscendingTimestamps;
+   }
+
+   @Override
+   public Watermark getWatermark() {
+   return new Watermark(maxTimestamp - 1);
+   }
+}
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
new file mode 100644
index 000..725f534
--- /dev/null
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ 

[flink] branch release-1.9 updated: [FLINK-13315][api-java] Port wmstrategies to api-java-bridge

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 66edf7d  [FLINK-13315][api-java] Port wmstrategies to api-java-bridge
66edf7d is described below

commit 66edf7d386817c7939fedcf73c926e94034eaa0a
Author: JingsongLi 
AuthorDate: Wed Jul 17 20:54:30 2019 +0800

[FLINK-13315][api-java] Port wmstrategies to api-java-bridge

This closes #9153.
---
 .../sources/wmstrategies/AscendingTimestamps.java  | 71 +
 .../wmstrategies/BoundedOutOfOrderTimestamps.java  | 88 ++
 .../wmstrategies/PeriodicWatermarkAssigner.java| 44 +++
 .../wmstrategies/PunctuatedWatermarkAssigner.java  | 40 ++
 .../sources/wmstrategies/PreserveWatermarks.java   | 53 +
 .../stream/StreamExecTableSourceScan.scala | 23 --
 .../sources/wmstrategies/AscendingTimestamps.scala | 49 
 .../sources/wmstrategies/watermarkStrategies.scala | 81 
 .../sources/wmstrategies/AscendingTimestamps.scala | 60 ---
 .../wmstrategies/BoundedOutOfOrderTimestamps.scala | 63 
 .../sources/wmstrategies/watermarkStrategies.scala | 80 
 .../table/dataformat/DataFormatConverters.java |  1 -
 12 files changed, 313 insertions(+), 340 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.java
new file mode 100644
index 000..4b85007
--- /dev/null
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/AscendingTimestamps.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.wmstrategies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.table.descriptors.Rowtime;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A watermark strategy for ascending rowtime attributes.
+ *
+ * Emits a watermark of the maximum observed timestamp so far minus 1.
+ * Rows that have a timestamp equal to the max timestamp are not late.
+ */
+@PublicEvolving
+public final class AscendingTimestamps extends PeriodicWatermarkAssigner {
+
+   private static final long serialVersionUID = 1L;
+
+   private long maxTimestamp = Long.MIN_VALUE + 1;
+
+   @Override
+   public void nextTimestamp(long timestamp) {
+   if (timestamp > maxTimestamp) {
+   maxTimestamp = timestamp;
+   }
+   }
+
+   @Override
+   public Map toProperties() {
+   Map map = new HashMap<>();
+   map.put(
+   Rowtime.ROWTIME_WATERMARKS_TYPE,
+   
Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING);
+   return map;
+   }
+
+   @Override
+   public int hashCode() {
+   return AscendingTimestamps.class.hashCode();
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   return obj instanceof AscendingTimestamps;
+   }
+
+   @Override
+   public Watermark getWatermark() {
+   return new Watermark(maxTimestamp - 1);
+   }
+}
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
new file mode 100644
index 000..725f534
--- /dev/null
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the 

[flink] 04/06: [FLINK-13078][table-common] Simplify serializable string representation for parsers

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6c65fc2d8e2817b823172a65a54df9a0361371a
Author: Timo Walther 
AuthorDate: Wed Jul 10 08:46:29 2019 +0200

[FLINK-13078][table-common] Simplify serializable string representation for 
parsers
---
 .../java/org/apache/flink/table/types/logical/AnyType.java   |  4 ++--
 .../org/apache/flink/table/types/logical/SymbolType.java |  2 +-
 .../flink/table/types/logical/TypeInformationAnyType.java|  2 +-
 .../java/org/apache/flink/table/types/LogicalTypesTest.java  | 12 ++--
 4 files changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
index ba849ca..82e805d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
@@ -35,7 +35,7 @@ import java.util.Set;
  * Logical type of an arbitrary serialized type. This type is a black box 
within the table ecosystem
  * and is only deserialized at the edges. The any type is an extension to the 
SQL standard.
  *
- * The serialized string representation is {@code ANY(c, s)} where {@code 
c} is the originating
+ * The serialized string representation is {@code ANY('c', 's')} where 
{@code c} is the originating
  * class and {@code s} is the serialized {@link TypeSerializerSnapshot} in 
Base64 encoding.
  *
  * @param  originating class for this type
@@ -43,7 +43,7 @@ import java.util.Set;
 @PublicEvolving
 public final class AnyType extends LogicalType {
 
-   private static final String FORMAT = "ANY(%s, %s)";
+   private static final String FORMAT = "ANY('%s', '%s')";
 
private static final Set INPUT_OUTPUT_CONVERSION = 
conversionSet(
byte[].class.getName(),
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/SymbolType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/SymbolType.java
index c57857c..643e10f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/SymbolType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/SymbolType.java
@@ -40,7 +40,7 @@ import java.util.Objects;
 @PublicEvolving
 public final class SymbolType extends LogicalType {
 
-   private static final String FORMAT = "SYMBOL(%s)";
+   private static final String FORMAT = "SYMBOL('%s')";
 
private final Class symbolClass;
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
index 2c1c0dc..9e64044 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
@@ -49,7 +49,7 @@ import java.util.Set;
 @PublicEvolving
 public final class TypeInformationAnyType extends LogicalType {
 
-   private static final String FORMAT = "ANY(%s, ?)";
+   private static final String FORMAT = "ANY('%s', ?)";
 
private static final Set INPUT_OUTPUT_CONVERSION = 
conversionSet(
byte[].class.getName(),
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index edf28cf..49feb47 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -557,7 +557,7 @@ public class LogicalTypesTest {
 
testEquality(anyType, new 
TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.LONG)));
 
-   testStringSummary(anyType, 
"ANY(org.apache.flink.api.java.tuple.Tuple2, ?)");
+   testStringSummary(anyType, 
"ANY('org.apache.flink.api.java.tuple.Tuple2', ?)");
 
testNullability(anyType);
 
@@ -572,8 +572,8 @@ public class LogicalTypesTest {
public void testAnyType() {
testAll(
new AnyType<>(Human.class, new 
KryoSerializer<>(Human.class, new ExecutionConfig())),
-   
"ANY(org.apache.flink.table.types.LogicalTypesTest$Human, " +
-   
"AEdvcmcuYXBhY2hlLmZsaW5rLmFwaS5qYXZhLnR5cGV1dGlscy5ydW50aW1lLmtyeW8uS3J5b1Nlcml"
 +
+

[flink] 06/06: [FLINK-13078][table-common] Add a logical type parser

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 479fb070edc4a681d85e4c20a964083751aa3720
Author: Timo Walther 
AuthorDate: Wed Jul 10 08:52:30 2019 +0200

[FLINK-13078][table-common] Add a logical type parser

This adds a parser for all logical types defined in FLIP-37.

This closes #9061.
---
 .../flink/table/types/logical/LogicalType.java |   3 +
 .../types/logical/utils/LogicalTypeParser.java | 900 +
 .../apache/flink/table/utils/EncodingUtils.java|   6 +-
 .../apache/flink/table/utils/TypeStringUtils.java  |  12 +-
 .../flink/table/types/LogicalTypeParserTest.java   | 519 
 5 files changed, 1437 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
index 4e4942a..cc46533 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.types.logical;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
 import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
@@ -98,6 +99,8 @@ public abstract class LogicalType implements Serializable {
 * Returns a string that fully serializes this instance. The serialized 
string can be used for
 * transmitting or persisting a type.
 *
+* See {@link LogicalTypeParser} for the reverse operation.
+*
 * @return detailed string for transmission or persistence
 */
public abstract String asSerializableString();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
new file mode 100644
index 000..b6fcd07
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import 
org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;

[flink] 01/06: [hotfix][table-common] Fix minor typos in ObjectIdentifier

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3f62d5d78a3f792e3fb984ff88d38190dabad586
Author: Timo Walther 
AuthorDate: Wed Jul 10 08:40:47 2019 +0200

[hotfix][table-common] Fix minor typos in ObjectIdentifier
---
 .../java/org/apache/flink/table/catalog/ObjectIdentifier.java | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectIdentifier.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectIdentifier.java
index b791a70..6e2f9c7 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectIdentifier.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectIdentifier.java
@@ -33,15 +33,15 @@ import static 
org.apache.flink.table.utils.EncodingUtils.escapeIdentifier;
  * While {@link ObjectPath} is used within the same catalog, instances of 
this class can be used
  * across catalogs.
  *
- * Two objects are considered equal if they share the same type identifier 
in a stable session context.
+ * Two objects are considered equal if they share the same object 
identifier in a stable session context.
  */
 public final class ObjectIdentifier implements Serializable {
 
-   private String catalogName;
+   private final String catalogName;
 
-   private String databaseName;
+   private final String databaseName;
 
-   private String objectName;
+   private final String objectName;
 
public static ObjectIdentifier of(String catalogName, String 
databaseName, String objectName) {
return new ObjectIdentifier(catalogName, databaseName, 
objectName);



[flink] 03/06: [hotfix][table-common] Link timestamp precisions

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8f3e7b91e0d2d1991bbc77d227a71fff751cac0d
Author: Timo Walther 
AuthorDate: Wed Jul 10 08:44:08 2019 +0200

[hotfix][table-common] Link timestamp precisions
---
 .../apache/flink/table/types/logical/LocalZonedTimestampType.java   | 6 +++---
 .../org/apache/flink/table/types/logical/ZonedTimestampType.java| 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
index d0c237c..90a8ed9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
@@ -53,11 +53,11 @@ import java.util.Set;
 @PublicEvolving
 public final class LocalZonedTimestampType extends LogicalType {
 
-   public static final int MIN_PRECISION = 0;
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
 
-   public static final int MAX_PRECISION = 9;
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
 
-   public static final int DEFAULT_PRECISION = 6;
+   public static final int DEFAULT_PRECISION = 
TimestampType.DEFAULT_PRECISION;
 
private static final String FORMAT = "TIMESTAMP(%d) WITH LOCAL TIME 
ZONE";
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
index 22e5538..d6a4464 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
@@ -49,11 +49,11 @@ import java.util.Set;
 @PublicEvolving
 public final class ZonedTimestampType extends LogicalType {
 
-   public static final int MIN_PRECISION = 0;
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
 
-   public static final int MAX_PRECISION = 9;
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
 
-   public static final int DEFAULT_PRECISION = 6;
+   public static final int DEFAULT_PRECISION = 
TimestampType.DEFAULT_PRECISION;
 
private static final String FORMAT = "TIMESTAMP(%d) WITH TIME ZONE";
 



[flink] 02/06: [hotfix][table-common] Update list of synonyms for logical types

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 178a7ea08cbff001375734a24e9e20fd94006fac
Author: Timo Walther 
AuthorDate: Wed Jul 10 08:43:18 2019 +0200

[hotfix][table-common] Update list of synonyms for logical types
---
 .../main/java/org/apache/flink/table/types/logical/DecimalType.java| 3 ++-
 .../src/main/java/org/apache/flink/table/types/logical/DoubleType.java | 3 ++-
 .../src/main/java/org/apache/flink/table/types/logical/IntType.java| 2 +-
 .../src/main/java/org/apache/flink/table/types/logical/RowType.java| 2 +-
 .../src/main/java/org/apache/flink/table/types/logical/TimeType.java   | 3 ++-
 .../main/java/org/apache/flink/table/types/logical/TimestampType.java  | 3 ++-
 6 files changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
index c11ede0..e91793d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
@@ -34,7 +34,8 @@ import java.util.Set;
  * digits in a number (=precision) and {@code s} is the number of digits to 
the right of the decimal
  * point in a number (=scale). {@code p} must have a value between 1 and 38 
(both inclusive). {@code s}
  * must have a value between 0 and {@code p} (both inclusive). The default 
value for {@code p} is 10.
- * The default value for {@code s} is 0.
+ * The default value for {@code s} is 0. {@code NUMERIC(p, s)} and {@code 
DEC(p, s)} are synonyms for
+ * this type.
  */
 @PublicEvolving
 public final class DecimalType extends LogicalType {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
index 90d1429..ea4f5b4 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
@@ -27,7 +27,8 @@ import java.util.Set;
 /**
  * Logical type of an 8-byte double precision floating point number.
  *
- * The serialized string representation is {@code DOUBLE}.
+ * The serialized string representation is {@code DOUBLE}. {@code DOUBLE 
PRECISION} is a synonym
+ * for this type.
  */
 @PublicEvolving
 public final class DoubleType extends LogicalType {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/IntType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/IntType.java
index 567d8ef..f362b36 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/IntType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/IntType.java
@@ -27,7 +27,7 @@ import java.util.Set;
 /**
  * Logical type of a 4-byte signed integer with values from -2,147,483,648 to 
2,147,483,647.
  *
- * The serialized string representation is {@code INT}.
+ * The serialized string representation is {@code INT}. {@code INTEGER} is 
a synonym for this type.
  */
 @PublicEvolving
 public final class IntType extends LogicalType {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
index 5a95e7b..c3a5cd1 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
@@ -47,7 +47,7 @@ import static 
org.apache.flink.table.utils.EncodingUtils.escapeSingleQuotes;
  *
  * The serialized string representation is {@code ROW} where
  * {@code n} is the unique name of a field, {@code t} is the logical type of a 
field, {@code d} is
- * the description of a field.
+ * the description of a field. {@code ROW(...)} is a synonym for being closer 
to the SQL standard.
  */
 @PublicEvolving
 public final class RowType extends LogicalType {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
index 15763d3..239eed5 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
@@ -35,7 +35,8 @@ import java.util.Set;
  *
  * The serialized 

[flink] 05/06: [FLINK-13078][table-common] Add an unresolved user-defined logical type

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 29fcd0c65900256222e954b7d91743053c379694
Author: Timo Walther 
AuthorDate: Wed Jul 10 08:51:22 2019 +0200

[FLINK-13078][table-common] Add an unresolved user-defined logical type
---
 .../flink/table/types/logical/LogicalTypeRoot.java |   3 +
 .../table/types/logical/LogicalTypeVisitor.java|   4 +-
 .../types/logical/UnresolvedUserDefinedType.java   | 153 +
 .../apache/flink/table/types/LogicalTypesTest.java |  11 ++
 4 files changed, 169 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
index 4725876..6ff360a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
@@ -156,6 +156,9 @@ public enum LogicalTypeRoot {
LogicalTypeFamily.EXTENSION),
 
SYMBOL(
+   LogicalTypeFamily.EXTENSION),
+
+   UNRESOLVED(
LogicalTypeFamily.EXTENSION);
 
private final Set families;
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
index 5bf5858..810174f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
@@ -24,8 +24,8 @@ import org.apache.flink.annotation.PublicEvolving;
  * The visitor definition of {@link LogicalType}. The visitor transforms a 
logical type into
  * instances of {@code R}.
  *
- * Incomplete types such as the {@link TypeInformationAnyType} are visited 
through the generic
- * {@link #visit(LogicalType)}.
+ * Incomplete types such as the {@link TypeInformationAnyType} or {@link 
UnresolvedUserDefinedType} are visited
+ * through the generic {@link #visit(LogicalType)}.
  *
  * @param  result type
  */
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
new file mode 100644
index 000..2168c8e
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Placeholder type of an unresolved user-defined type that is identified by a 
partially or fully
+ * qualified path ({@code [catalog].[database].[type]}).
+ *
+ * It assumes that a type has been registered in a catalog and just needs 
to be resolved to a
+ * {@link DistinctType} or {@link StructuredType}.
+ *
+ * Two unresolved types are considered equal if they share the same path in 
a stable session context.
+ *
+ * @see UserDefinedType
+ */
+@PublicEvolving
+public final class UnresolvedUserDefinedType extends LogicalType {
+
+   private final @Nullable String catalog;
+
+   private final @Nullable String database;
+
+   private final String typeIdentifier;
+
+   public UnresolvedUserDefinedType(
+   boolean isNullable,
+   @Nullable String catalog,
+   @Nullable 

[flink] 04/06: [FLINK-13078][table-common] Simplify serializable string representation for parsers

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e750efc0a84c039f42f1edc81257f9fab0de03d9
Author: Timo Walther 
AuthorDate: Wed Jul 10 08:46:29 2019 +0200

[FLINK-13078][table-common] Simplify serializable string representation for 
parsers
---
 .../java/org/apache/flink/table/types/logical/AnyType.java   |  4 ++--
 .../org/apache/flink/table/types/logical/SymbolType.java |  2 +-
 .../flink/table/types/logical/TypeInformationAnyType.java|  2 +-
 .../java/org/apache/flink/table/types/LogicalTypesTest.java  | 12 ++--
 4 files changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
index ba849ca..82e805d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/AnyType.java
@@ -35,7 +35,7 @@ import java.util.Set;
  * Logical type of an arbitrary serialized type. This type is a black box 
within the table ecosystem
  * and is only deserialized at the edges. The any type is an extension to the 
SQL standard.
  *
- * The serialized string representation is {@code ANY(c, s)} where {@code 
c} is the originating
+ * The serialized string representation is {@code ANY('c', 's')} where 
{@code c} is the originating
  * class and {@code s} is the serialized {@link TypeSerializerSnapshot} in 
Base64 encoding.
  *
  * @param  originating class for this type
@@ -43,7 +43,7 @@ import java.util.Set;
 @PublicEvolving
 public final class AnyType extends LogicalType {
 
-   private static final String FORMAT = "ANY(%s, %s)";
+   private static final String FORMAT = "ANY('%s', '%s')";
 
private static final Set INPUT_OUTPUT_CONVERSION = 
conversionSet(
byte[].class.getName(),
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/SymbolType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/SymbolType.java
index c57857c..643e10f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/SymbolType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/SymbolType.java
@@ -40,7 +40,7 @@ import java.util.Objects;
 @PublicEvolving
 public final class SymbolType extends LogicalType {
 
-   private static final String FORMAT = "SYMBOL(%s)";
+   private static final String FORMAT = "SYMBOL('%s')";
 
private final Class symbolClass;
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
index 2c1c0dc..9e64044 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TypeInformationAnyType.java
@@ -49,7 +49,7 @@ import java.util.Set;
 @PublicEvolving
 public final class TypeInformationAnyType extends LogicalType {
 
-   private static final String FORMAT = "ANY(%s, ?)";
+   private static final String FORMAT = "ANY('%s', ?)";
 
private static final Set INPUT_OUTPUT_CONVERSION = 
conversionSet(
byte[].class.getName(),
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index edf28cf..49feb47 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -557,7 +557,7 @@ public class LogicalTypesTest {
 
testEquality(anyType, new 
TypeInformationAnyType<>(Types.TUPLE(Types.STRING, Types.LONG)));
 
-   testStringSummary(anyType, 
"ANY(org.apache.flink.api.java.tuple.Tuple2, ?)");
+   testStringSummary(anyType, 
"ANY('org.apache.flink.api.java.tuple.Tuple2', ?)");
 
testNullability(anyType);
 
@@ -572,8 +572,8 @@ public class LogicalTypesTest {
public void testAnyType() {
testAll(
new AnyType<>(Human.class, new 
KryoSerializer<>(Human.class, new ExecutionConfig())),
-   
"ANY(org.apache.flink.table.types.LogicalTypesTest$Human, " +
-   
"AEdvcmcuYXBhY2hlLmZsaW5rLmFwaS5qYXZhLnR5cGV1dGlscy5ydW50aW1lLmtyeW8uS3J5b1Nlcml"
 +
+   

[flink] 01/06: [hotfix][table-common] Fix minor typos in ObjectIdentifier

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3c443a0ea4da5103ebe4abe26f1da2b639a8206b
Author: Timo Walther 
AuthorDate: Wed Jul 10 08:40:47 2019 +0200

[hotfix][table-common] Fix minor typos in ObjectIdentifier
---
 .../java/org/apache/flink/table/catalog/ObjectIdentifier.java | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectIdentifier.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectIdentifier.java
index b791a70..6e2f9c7 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectIdentifier.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectIdentifier.java
@@ -33,15 +33,15 @@ import static 
org.apache.flink.table.utils.EncodingUtils.escapeIdentifier;
  * While {@link ObjectPath} is used within the same catalog, instances of 
this class can be used
  * across catalogs.
  *
- * Two objects are considered equal if they share the same type identifier 
in a stable session context.
+ * Two objects are considered equal if they share the same object 
identifier in a stable session context.
  */
 public final class ObjectIdentifier implements Serializable {
 
-   private String catalogName;
+   private final String catalogName;
 
-   private String databaseName;
+   private final String databaseName;
 
-   private String objectName;
+   private final String objectName;
 
public static ObjectIdentifier of(String catalogName, String 
databaseName, String objectName) {
return new ObjectIdentifier(catalogName, databaseName, 
objectName);



[flink] 02/06: [hotfix][table-common] Update list of synonyms for logical types

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 15ef025bd387716b1a2259ea238bd1222623af76
Author: Timo Walther 
AuthorDate: Wed Jul 10 08:43:18 2019 +0200

[hotfix][table-common] Update list of synonyms for logical types
---
 .../main/java/org/apache/flink/table/types/logical/DecimalType.java| 3 ++-
 .../src/main/java/org/apache/flink/table/types/logical/DoubleType.java | 3 ++-
 .../src/main/java/org/apache/flink/table/types/logical/IntType.java| 2 +-
 .../src/main/java/org/apache/flink/table/types/logical/RowType.java| 2 +-
 .../src/main/java/org/apache/flink/table/types/logical/TimeType.java   | 3 ++-
 .../main/java/org/apache/flink/table/types/logical/TimestampType.java  | 3 ++-
 6 files changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
index c11ede0..e91793d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DecimalType.java
@@ -34,7 +34,8 @@ import java.util.Set;
  * digits in a number (=precision) and {@code s} is the number of digits to 
the right of the decimal
  * point in a number (=scale). {@code p} must have a value between 1 and 38 
(both inclusive). {@code s}
  * must have a value between 0 and {@code p} (both inclusive). The default 
value for {@code p} is 10.
- * The default value for {@code s} is 0.
+ * The default value for {@code s} is 0. {@code NUMERIC(p, s)} and {@code 
DEC(p, s)} are synonyms for
+ * this type.
  */
 @PublicEvolving
 public final class DecimalType extends LogicalType {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
index 90d1429..ea4f5b4 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/DoubleType.java
@@ -27,7 +27,8 @@ import java.util.Set;
 /**
  * Logical type of an 8-byte double precision floating point number.
  *
- * The serialized string representation is {@code DOUBLE}.
+ * The serialized string representation is {@code DOUBLE}. {@code DOUBLE 
PRECISION} is a synonym
+ * for this type.
  */
 @PublicEvolving
 public final class DoubleType extends LogicalType {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/IntType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/IntType.java
index 567d8ef..f362b36 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/IntType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/IntType.java
@@ -27,7 +27,7 @@ import java.util.Set;
 /**
  * Logical type of a 4-byte signed integer with values from -2,147,483,648 to 
2,147,483,647.
  *
- * The serialized string representation is {@code INT}.
+ * The serialized string representation is {@code INT}. {@code INTEGER} is 
a synonym for this type.
  */
 @PublicEvolving
 public final class IntType extends LogicalType {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
index 5a95e7b..c3a5cd1 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
@@ -47,7 +47,7 @@ import static 
org.apache.flink.table.utils.EncodingUtils.escapeSingleQuotes;
  *
  * The serialized string representation is {@code ROW} where
  * {@code n} is the unique name of a field, {@code t} is the logical type of a 
field, {@code d} is
- * the description of a field.
+ * the description of a field. {@code ROW(...)} is a synonym for being closer 
to the SQL standard.
  */
 @PublicEvolving
 public final class RowType extends LogicalType {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
index 15763d3..239eed5 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/TimeType.java
@@ -35,7 +35,8 @@ import java.util.Set;
  *
  * The 

[flink] 05/06: [FLINK-13078][table-common] Add an unresolved user-defined logical type

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e5d4ec09bfa635590b1a5d506a3b6dee714cbc62
Author: Timo Walther 
AuthorDate: Wed Jul 10 08:51:22 2019 +0200

[FLINK-13078][table-common] Add an unresolved user-defined logical type
---
 .../flink/table/types/logical/LogicalTypeRoot.java |   3 +
 .../table/types/logical/LogicalTypeVisitor.java|   4 +-
 .../types/logical/UnresolvedUserDefinedType.java   | 153 +
 .../apache/flink/table/types/LogicalTypesTest.java |  11 ++
 4 files changed, 169 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
index 4725876..6ff360a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
@@ -156,6 +156,9 @@ public enum LogicalTypeRoot {
LogicalTypeFamily.EXTENSION),
 
SYMBOL(
+   LogicalTypeFamily.EXTENSION),
+
+   UNRESOLVED(
LogicalTypeFamily.EXTENSION);
 
private final Set families;
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
index 5bf5858..810174f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java
@@ -24,8 +24,8 @@ import org.apache.flink.annotation.PublicEvolving;
  * The visitor definition of {@link LogicalType}. The visitor transforms a 
logical type into
  * instances of {@code R}.
  *
- * Incomplete types such as the {@link TypeInformationAnyType} are visited 
through the generic
- * {@link #visit(LogicalType)}.
+ * Incomplete types such as the {@link TypeInformationAnyType} or {@link 
UnresolvedUserDefinedType} are visited
+ * through the generic {@link #visit(LogicalType)}.
  *
  * @param  result type
  */
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
new file mode 100644
index 000..2168c8e
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Placeholder type of an unresolved user-defined type that is identified by a 
partially or fully
+ * qualified path ({@code [catalog].[database].[type]}).
+ *
+ * It assumes that a type has been registered in a catalog and just needs 
to be resolved to a
+ * {@link DistinctType} or {@link StructuredType}.
+ *
+ * Two unresolved types are considered equal if they share the same path in 
a stable session context.
+ *
+ * @see UserDefinedType
+ */
+@PublicEvolving
+public final class UnresolvedUserDefinedType extends LogicalType {
+
+   private final @Nullable String catalog;
+
+   private final @Nullable String database;
+
+   private final String typeIdentifier;
+
+   public UnresolvedUserDefinedType(
+   boolean isNullable,
+   @Nullable String catalog,
+   

[flink] 03/06: [hotfix][table-common] Link timestamp precisions

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d05b436024888e554033a64839056f1b4dfbfcb6
Author: Timo Walther 
AuthorDate: Wed Jul 10 08:44:08 2019 +0200

[hotfix][table-common] Link timestamp precisions
---
 .../apache/flink/table/types/logical/LocalZonedTimestampType.java   | 6 +++---
 .../org/apache/flink/table/types/logical/ZonedTimestampType.java| 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
index d0c237c..90a8ed9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
@@ -53,11 +53,11 @@ import java.util.Set;
 @PublicEvolving
 public final class LocalZonedTimestampType extends LogicalType {
 
-   public static final int MIN_PRECISION = 0;
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
 
-   public static final int MAX_PRECISION = 9;
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
 
-   public static final int DEFAULT_PRECISION = 6;
+   public static final int DEFAULT_PRECISION = 
TimestampType.DEFAULT_PRECISION;
 
private static final String FORMAT = "TIMESTAMP(%d) WITH LOCAL TIME 
ZONE";
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
index 22e5538..d6a4464 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/ZonedTimestampType.java
@@ -49,11 +49,11 @@ import java.util.Set;
 @PublicEvolving
 public final class ZonedTimestampType extends LogicalType {
 
-   public static final int MIN_PRECISION = 0;
+   public static final int MIN_PRECISION = TimestampType.MIN_PRECISION;
 
-   public static final int MAX_PRECISION = 9;
+   public static final int MAX_PRECISION = TimestampType.MAX_PRECISION;
 
-   public static final int DEFAULT_PRECISION = 6;
+   public static final int DEFAULT_PRECISION = 
TimestampType.DEFAULT_PRECISION;
 
private static final String FORMAT = "TIMESTAMP(%d) WITH TIME ZONE";
 



[flink] 06/06: [FLINK-13078][table-common] Add a logical type parser

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2922105e08ac307835022acbf0bab4ed819d862
Author: Timo Walther 
AuthorDate: Wed Jul 10 08:52:30 2019 +0200

[FLINK-13078][table-common] Add a logical type parser

This adds a parser for all logical types defined in FLIP-37.

This closes #9061.
---
 .../flink/table/types/logical/LogicalType.java |   3 +
 .../types/logical/utils/LogicalTypeParser.java | 900 +
 .../apache/flink/table/utils/EncodingUtils.java|   6 +-
 .../apache/flink/table/utils/TypeStringUtils.java  |  12 +-
 .../flink/table/types/LogicalTypeParserTest.java   | 519 
 5 files changed, 1437 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
index 4e4942a..cc46533 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.types.logical;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
 import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
@@ -98,6 +99,8 @@ public abstract class LogicalType implements Serializable {
 * Returns a string that fully serializes this instance. The serialized 
string can be used for
 * transmitting or persisting a type.
 *
+* See {@link LogicalTypeParser} for the reverse operation.
+*
 * @return detailed string for transmission or persistence
 */
public abstract String asSerializableString();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
new file mode 100644
index 000..b6fcd07
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import 
org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import 

[flink] branch release-1.9 updated (66edf7d -> b292210)

2019-07-18 Thread twalthr
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 66edf7d  [FLINK-13315][api-java] Port wmstrategies to api-java-bridge
 new 3c443a0  [hotfix][table-common] Fix minor typos in ObjectIdentifier
 new 15ef025  [hotfix][table-common] Update list of synonyms for logical 
types
 new d05b436  [hotfix][table-common] Link timestamp precisions
 new e750efc  [FLINK-13078][table-common] Simplify serializable string 
representation for parsers
 new e5d4ec0  [FLINK-13078][table-common] Add an unresolved user-defined 
logical type
 new b292210  [FLINK-13078][table-common] Add a logical type parser

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/table/catalog/ObjectIdentifier.java  |   8 +-
 .../apache/flink/table/types/logical/AnyType.java  |   4 +-
 .../flink/table/types/logical/DecimalType.java |   3 +-
 .../flink/table/types/logical/DoubleType.java  |   3 +-
 .../apache/flink/table/types/logical/IntType.java  |   2 +-
 .../types/logical/LocalZonedTimestampType.java |   6 +-
 .../flink/table/types/logical/LogicalType.java |   3 +
 .../flink/table/types/logical/LogicalTypeRoot.java |   3 +
 .../table/types/logical/LogicalTypeVisitor.java|   4 +-
 .../apache/flink/table/types/logical/RowType.java  |   2 +-
 .../flink/table/types/logical/SymbolType.java  |   2 +-
 .../apache/flink/table/types/logical/TimeType.java |   3 +-
 .../flink/table/types/logical/TimestampType.java   |   3 +-
 .../types/logical/TypeInformationAnyType.java  |   2 +-
 .../types/logical/UnresolvedUserDefinedType.java   | 153 
 .../table/types/logical/ZonedTimestampType.java|   6 +-
 .../types/logical/utils/LogicalTypeParser.java | 900 +
 .../apache/flink/table/utils/EncodingUtils.java|   6 +-
 .../apache/flink/table/utils/TypeStringUtils.java  |  12 +-
 .../flink/table/types/LogicalTypeParserTest.java   | 519 
 .../apache/flink/table/types/LogicalTypesTest.java |  23 +-
 21 files changed, 1636 insertions(+), 31 deletions(-)
 create mode 100644 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/UnresolvedUserDefinedType.java
 create mode 100644 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java
 create mode 100644 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java



[flink] branch master updated: [FLINK-13312][hive] move tests for data type mappings between Flink and Hive into its own test class

2019-07-18 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9783d88  [FLINK-13312][hive] move tests for data type mappings between 
Flink and Hive into its own test class
9783d88 is described below

commit 9783d88463ace4c728de3f5861efd42c2c07e23e
Author: bowen.li 
AuthorDate: Wed Jul 17 14:36:09 2019 -0700

[FLINK-13312][hive] move tests for data type mappings between Flink and 
Hive into its own test class

This PR moves UT for data type mapping between Flink and Hive to its own 
test class.

This closes #9151.
---
 ...adataTest.java => HiveCatalogDataTypeTest.java} | 194 +++--
 .../hive/HiveCatalogGenericMetadataTest.java   | 129 --
 .../apache/flink/table/catalog/CatalogTest.java|   4 +-
 3 files changed, 68 insertions(+), 259 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
similarity index 62%
copy from 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
copy to 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
index 83e0132..e9d40fe 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
@@ -20,10 +20,12 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.CatalogTestBase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.BinaryType;
@@ -31,17 +33,36 @@ import org.apache.flink.table.types.logical.VarBinaryType;
 
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
+import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
 
 /**
- * Test for HiveCatalog on generic metadata.
+ * Test for data type mappings in HiveCatalog.
  */
-public class HiveCatalogGenericMetadataTest extends CatalogTestBase {
+public class HiveCatalogDataTypeTest {
+
+   private static HiveCatalog catalog;
+
+   protected final String db1 = "db1";
+   protected final String db2 = "db2";
+
+   protected final String t1 = "t1";
+   protected final String t2 = "t2";
+   protected final ObjectPath path1 = new ObjectPath(db1, t1);
+   protected final ObjectPath path2 = new ObjectPath(db2, t2);
+   protected final ObjectPath path3 = new ObjectPath(db1, t2);
+
+   @Rule
+   public ExpectedException exception = ExpectedException.none();
 
@BeforeClass
public static void init() {
@@ -49,12 +70,37 @@ public class HiveCatalogGenericMetadataTest extends 
CatalogTestBase {
catalog.open();
}
 
-   // -- TODO: Move data types tests to its own test class as it's 
shared between generic metadata and hive metadata
-   // -- data types --
+   @After
+   public void cleanup() throws Exception {
+   if (catalog.tableExists(path1)) {
+   catalog.dropTable(path1, true);
+   }
+   if (catalog.tableExists(path2)) {
+   catalog.dropTable(path2, true);
+   }
+   if (catalog.tableExists(path3)) {
+   catalog.dropTable(path3, true);
+   }
+   if (catalog.functionExists(path1)) {
+   catalog.dropFunction(path1, true);
+   }
+   if (catalog.databaseExists(db1)) {
+   catalog.dropDatabase(db1, true);
+   }
+   if (catalog.databaseExists(db2)) {
+   catalog.dropDatabase(db2, true);
+   }
+   }
+
+

[flink] branch release-1.9 updated: [FLINK-13312][hive] move tests for data type mappings between Flink and Hive into its own test class

2019-07-18 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new d230aac  [FLINK-13312][hive] move tests for data type mappings between 
Flink and Hive into its own test class
d230aac is described below

commit d230aac5a479891b3d5421105c1e862de94a9a89
Author: bowen.li 
AuthorDate: Wed Jul 17 14:36:09 2019 -0700

[FLINK-13312][hive] move tests for data type mappings between Flink and 
Hive into its own test class

This PR moves UT for data type mapping between Flink and Hive to its own 
test class.

This closes #9151.
---
 ...adataTest.java => HiveCatalogDataTypeTest.java} | 194 +++--
 .../hive/HiveCatalogGenericMetadataTest.java   | 129 --
 .../apache/flink/table/catalog/CatalogTest.java|   4 +-
 3 files changed, 68 insertions(+), 259 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
similarity index 62%
copy from 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
copy to 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
index 83e0132..e9d40fe 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
@@ -20,10 +20,12 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.CatalogTestBase;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.BinaryType;
@@ -31,17 +33,36 @@ import org.apache.flink.table.types.logical.VarBinaryType;
 
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
+import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
 
 /**
- * Test for HiveCatalog on generic metadata.
+ * Test for data type mappings in HiveCatalog.
  */
-public class HiveCatalogGenericMetadataTest extends CatalogTestBase {
+public class HiveCatalogDataTypeTest {
+
+   private static HiveCatalog catalog;
+
+   protected final String db1 = "db1";
+   protected final String db2 = "db2";
+
+   protected final String t1 = "t1";
+   protected final String t2 = "t2";
+   protected final ObjectPath path1 = new ObjectPath(db1, t1);
+   protected final ObjectPath path2 = new ObjectPath(db2, t2);
+   protected final ObjectPath path3 = new ObjectPath(db1, t2);
+
+   @Rule
+   public ExpectedException exception = ExpectedException.none();
 
@BeforeClass
public static void init() {
@@ -49,12 +70,37 @@ public class HiveCatalogGenericMetadataTest extends 
CatalogTestBase {
catalog.open();
}
 
-   // -- TODO: Move data types tests to its own test class as it's 
shared between generic metadata and hive metadata
-   // -- data types --
+   @After
+   public void cleanup() throws Exception {
+   if (catalog.tableExists(path1)) {
+   catalog.dropTable(path1, true);
+   }
+   if (catalog.tableExists(path2)) {
+   catalog.dropTable(path2, true);
+   }
+   if (catalog.tableExists(path3)) {
+   catalog.dropTable(path3, true);
+   }
+   if (catalog.functionExists(path1)) {
+   catalog.dropFunction(path1, true);
+   }
+   if (catalog.databaseExists(db1)) {
+   catalog.dropDatabase(db1, true);
+   }
+   if (catalog.databaseExists(db2)) {
+   catalog.dropDatabase(db2, true);
+   }
+  

[flink] branch master updated: [FLINK-13327][table-planner-blink] Fixed scala 2.12 compilation

2019-07-18 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new abeb1f5  [FLINK-13327][table-planner-blink] Fixed scala 2.12 
compilation
abeb1f5 is described below

commit abeb1f5cc73e33800d0af1367f345d1bf2f2822d
Author: Dawid Wysakowicz 
AuthorDate: Thu Jul 18 18:26:59 2019 +0200

[FLINK-13327][table-planner-blink] Fixed scala 2.12 compilation
---
 .../apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala | 2 +-
 .../src/test/scala/org/apache/flink/table/util/TableTestBase.scala| 2 +-
 .../src/test/scala/org/apache/flink/table/util/testTableSources.scala | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
index 5b1d050..3d0d4d7 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
@@ -180,7 +180,7 @@ object ExecNodeResourceTest {
 /**
   * Batch/Stream [[org.apache.flink.table.sources.TableSource]] for resource 
testing.
   */
-class MockTableSource(val isBounded: Boolean, schema: TableSchema)
+class MockTableSource(override val isBounded: Boolean, schema: TableSchema)
 extends StreamTableSource[BaseRow] {
 
   override def getDataStream(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 7a26fb6..28dacb7 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -849,7 +849,7 @@ case class JavaBatchTableTestUtil(test: TableTestBase) 
extends JavaTableTestUtil
 /**
   * Batch/Stream [[org.apache.flink.table.sources.TableSource]] for testing.
   */
-class TestTableSource(val isBounded: Boolean, schema: TableSchema)
+class TestTableSource(override val isBounded: Boolean, schema: TableSchema)
   extends StreamTableSource[Row] {
 
   override def getDataStream(execEnv: environment.StreamExecutionEnvironment): 
DataStream[Row] = {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index f51b1e2..fa73905 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -132,7 +132,7 @@ object TestTableSources {
 }
 
 class TestTableSourceWithTime[T](
-val isBounded: Boolean,
+override val isBounded: Boolean,
 tableSchema: TableSchema,
 returnType: TypeInformation[T],
 values: Seq[T],
@@ -342,7 +342,7 @@ class TestNestedProjectableTableSource(
   * @param filterPushedDown Whether predicates have been pushed down yet.
   */
 class TestFilterableTableSource(
-val isBounded: Boolean,
+override val isBounded: Boolean,
 rowTypeInfo: RowTypeInfo,
 data: Seq[Row],
 filterableFields: Set[String] = Set(),



[flink] branch release-1.9 updated: [FLINK-13327][table-planner-blink] Fixed scala 2.12 compilation

2019-07-18 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 0faa667  [FLINK-13327][table-planner-blink] Fixed scala 2.12 
compilation
0faa667 is described below

commit 0faa66747775bd75cc2f55c4d6b00560a8c41c05
Author: Dawid Wysakowicz 
AuthorDate: Thu Jul 18 18:26:59 2019 +0200

[FLINK-13327][table-planner-blink] Fixed scala 2.12 compilation
---
 .../apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala | 2 +-
 .../src/test/scala/org/apache/flink/table/util/TableTestBase.scala| 2 +-
 .../src/test/scala/org/apache/flink/table/util/testTableSources.scala | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
index 5b1d050..3d0d4d7 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/nodes/resource/ExecNodeResourceTest.scala
@@ -180,7 +180,7 @@ object ExecNodeResourceTest {
 /**
   * Batch/Stream [[org.apache.flink.table.sources.TableSource]] for resource 
testing.
   */
-class MockTableSource(val isBounded: Boolean, schema: TableSchema)
+class MockTableSource(override val isBounded: Boolean, schema: TableSchema)
 extends StreamTableSource[BaseRow] {
 
   override def getDataStream(
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index 7a26fb6..28dacb7 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -849,7 +849,7 @@ case class JavaBatchTableTestUtil(test: TableTestBase) 
extends JavaTableTestUtil
 /**
   * Batch/Stream [[org.apache.flink.table.sources.TableSource]] for testing.
   */
-class TestTableSource(val isBounded: Boolean, schema: TableSchema)
+class TestTableSource(override val isBounded: Boolean, schema: TableSchema)
   extends StreamTableSource[Row] {
 
   override def getDataStream(execEnv: environment.StreamExecutionEnvironment): 
DataStream[Row] = {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
index f51b1e2..fa73905 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala
@@ -132,7 +132,7 @@ object TestTableSources {
 }
 
 class TestTableSourceWithTime[T](
-val isBounded: Boolean,
+override val isBounded: Boolean,
 tableSchema: TableSchema,
 returnType: TypeInformation[T],
 values: Seq[T],
@@ -342,7 +342,7 @@ class TestNestedProjectableTableSource(
   * @param filterPushedDown Whether predicates have been pushed down yet.
   */
 class TestFilterableTableSource(
-val isBounded: Boolean,
+override val isBounded: Boolean,
 rowTypeInfo: RowTypeInfo,
 data: Seq[Row],
 filterableFields: Set[String] = Set(),



Build failed in Jenkins: flink-snapshot-deployment-1.8 #143

2019-07-18 Thread Apache Jenkins Server
See 


--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on H41 (ubuntu xenial) in workspace 

No credentials specified
Wiping out workspace first.
Cloning the remote Git repository
Using shallow clone
Cloning repository https://gitbox.apache.org/repos/asf/flink.git
 > git init  # 
 > timeout=10
Fetching upstream changes from https://gitbox.apache.org/repos/asf/flink.git
 > git --version # timeout=10
 > git fetch --tags --progress https://gitbox.apache.org/repos/asf/flink.git 
 > +refs/heads/*:refs/remotes/origin/* --depth=1
ERROR: Error cloning remote repo 'origin'
hudson.plugins.git.GitException: Command "git fetch --tags --progress 
https://gitbox.apache.org/repos/asf/flink.git 
+refs/heads/*:refs/remotes/origin/* --depth=1" returned status code 128:
stdout: 
stderr: fatal: unable to access 
'https://gitbox.apache.org/repos/asf/flink.git/': Could not resolve host: 
gitbox.apache.org

at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2042)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandWithCredentials(CliGitAPIImpl.java:1761)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.access$400(CliGitAPIImpl.java:72)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl$1.execute(CliGitAPIImpl.java:442)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl$2.execute(CliGitAPIImpl.java:655)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler$1.call(RemoteGitImpl.java:153)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler$1.call(RemoteGitImpl.java:146)
at hudson.remoting.UserRequest.perform(UserRequest.java:212)
at hudson.remoting.UserRequest.perform(UserRequest.java:54)
at hudson.remoting.Request$2.run(Request.java:369)
at 
hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Suppressed: hudson.remoting.Channel$CallSiteStackTrace: Remote call to 
H41
at 
hudson.remoting.Channel.attachCallSiteStackTrace(Channel.java:1741)
at 
hudson.remoting.UserRequest$ExceptionResponse.retrieve(UserRequest.java:357)
at hudson.remoting.Channel.call(Channel.java:955)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler.execute(RemoteGitImpl.java:146)
at sun.reflect.GeneratedMethodAccessor974.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler.invoke(RemoteGitImpl.java:132)
at com.sun.proxy.$Proxy135.execute(Unknown Source)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1152)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1192)
at hudson.scm.SCM.checkout(SCM.java:504)
at 
hudson.model.AbstractProject.checkout(AbstractProject.java:1208)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at 
jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1810)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at 
hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:429)
ERROR: Error cloning remote repo 'origin'
Retrying after 10 seconds
No credentials specified
Wiping out workspace first.
Cloning the remote Git repository
Using shallow clone
Cloning repository https://gitbox.apache.org/repos/asf/flink.git
 > git init  # 
 > timeout=10
Fetching upstream changes from https://gitbox.apache.org/repos/asf/flink.git
 > git --version # timeout=10
 > git fetch --tags --progress https://gitbox.apache.org/repos/asf/flink.git 
 > +refs/heads/*:refs/remotes/origin/* --depth=1
ERROR: Error cloning remote repo 'origin'
hudson.plugins.git.GitException: Command "git fetch --tags --progress 

Build failed in Jenkins: flink-snapshot-deployment-1.7 #252

2019-07-18 Thread Apache Jenkins Server
See 


--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on H27 (ubuntu xenial) in workspace 

No credentials specified
Wiping out workspace first.
Cloning the remote Git repository
Using shallow clone
Cloning repository https://gitbox.apache.org/repos/asf/flink.git
 > git init  # 
 > timeout=10
Fetching upstream changes from https://gitbox.apache.org/repos/asf/flink.git
 > git --version # timeout=10
 > git fetch --tags --progress https://gitbox.apache.org/repos/asf/flink.git 
 > +refs/heads/*:refs/remotes/origin/* --depth=1
ERROR: Error cloning remote repo 'origin'
hudson.plugins.git.GitException: Command "git fetch --tags --progress 
https://gitbox.apache.org/repos/asf/flink.git 
+refs/heads/*:refs/remotes/origin/* --depth=1" returned status code 128:
stdout: 
stderr: fatal: unable to access 
'https://gitbox.apache.org/repos/asf/flink.git/': Could not resolve host: 
gitbox.apache.org

at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandIn(CliGitAPIImpl.java:2042)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.launchCommandWithCredentials(CliGitAPIImpl.java:1761)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl.access$400(CliGitAPIImpl.java:72)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl$1.execute(CliGitAPIImpl.java:442)
at 
org.jenkinsci.plugins.gitclient.CliGitAPIImpl$2.execute(CliGitAPIImpl.java:655)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler$1.call(RemoteGitImpl.java:153)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler$1.call(RemoteGitImpl.java:146)
at hudson.remoting.UserRequest.perform(UserRequest.java:212)
at hudson.remoting.UserRequest.perform(UserRequest.java:54)
at hudson.remoting.Request$2.run(Request.java:369)
at 
hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Suppressed: hudson.remoting.Channel$CallSiteStackTrace: Remote call to 
H27
at 
hudson.remoting.Channel.attachCallSiteStackTrace(Channel.java:1741)
at 
hudson.remoting.UserRequest$ExceptionResponse.retrieve(UserRequest.java:357)
at hudson.remoting.Channel.call(Channel.java:955)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler.execute(RemoteGitImpl.java:146)
at sun.reflect.GeneratedMethodAccessor974.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.jenkinsci.plugins.gitclient.RemoteGitImpl$CommandInvocationHandler.invoke(RemoteGitImpl.java:132)
at com.sun.proxy.$Proxy135.execute(Unknown Source)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1152)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1192)
at hudson.scm.SCM.checkout(SCM.java:504)
at 
hudson.model.AbstractProject.checkout(AbstractProject.java:1208)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at 
jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1810)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at 
hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:429)
ERROR: Error cloning remote repo 'origin'
Retrying after 10 seconds
No credentials specified
Wiping out workspace first.
Cloning the remote Git repository
Using shallow clone
Cloning repository https://gitbox.apache.org/repos/asf/flink.git
 > git init  # 
 > timeout=10
Fetching upstream changes from https://gitbox.apache.org/repos/asf/flink.git
 > git --version # timeout=10
 > git fetch --tags --progress https://gitbox.apache.org/repos/asf/flink.git 
 > +refs/heads/*:refs/remotes/origin/* --depth=1
ERROR: Error cloning remote repo 'origin'
hudson.plugins.git.GitException: Command "git fetch --tags --progress 

[flink] branch master updated: [FLINK-13269][table] Copy RelDecorrelator & FlinkFilterJoinRule to flink planner to fix CALCITE-3169 & CALCITE-3170

2019-07-18 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 32ad3fa  [FLINK-13269][table] Copy RelDecorrelator & 
FlinkFilterJoinRule to flink planner to fix CALCITE-3169 & CALCITE-3170
32ad3fa is described below

commit 32ad3fa960681ffc8c21179b6592f6e0a6875c11
Author: godfreyhe 
AuthorDate: Tue Jul 16 21:54:13 2019 +0800

[FLINK-13269][table] Copy RelDecorrelator & FlinkFilterJoinRule to flink 
planner to fix CALCITE-3169 & CALCITE-3170

This closes #9122
---
 .../apache/calcite/sql2rel/RelDecorrelator.java| 14 ++
 .../plan/rules/logical/FlinkFilterJoinRule.java|  5 +++--
 .../apache/calcite/sql2rel/RelDecorrelator.java| 22 ++
 .../plan/rules/logical/FlinkFilterJoinRule.java|  5 +++--
 4 files changed, 30 insertions(+), 16 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index d260514..dedddfb 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -117,8 +117,13 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 
 /**
- *  This class is copied from Apache Calcite except that it supports SEMI/ANTI 
join.
- *  NOTES: This file should be deleted when CALCITE-3169 and CALCITE-3170 are 
fixed.
+ * This class is copied from Apache Calcite except that it supports SEMI/ANTI 
join.
+ * NOTES: This file should be deleted when CALCITE-3169 and CALCITE-3170 are 
fixed,
+ * and please make sure to synchronize with RelDecorrelator in flink planner 
when changing this class.
+ * Modification:
+ * 1. lines changed (249-251)
+ * 2. lines changed (271-278)
+ * 3. lines changed (1214-1215)
  */
 
 /**
@@ -241,8 +246,9 @@ public class RelDecorrelator implements ReflectiveVisitor {
 .addRuleInstance(new AdjustProjectForCountAggregateRule(false, f))
 .addRuleInstance(new AdjustProjectForCountAggregateRule(true, f))
 .addRuleInstance(
-new FilterJoinRule.FilterIntoJoinRule(true, f,
-FilterJoinRule.TRUE_PREDICATE))
+// use FilterJoinRule instead of FlinkFilterJoinRule while 
CALCITE-3170 is fixed
+new FlinkFilterJoinRule.FlinkFilterIntoJoinRule(true, f,
+FlinkFilterJoinRule.TRUE_PREDICATE))
 .addRuleInstance(
 new FilterProjectTransposeRule(Filter.class, Project.class, true,
 true, f))
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
index 3513ba7..084fc0e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
@@ -42,9 +42,10 @@ import java.util.Objects;
 
 /**
  * This rules is copied from Calcite's {@link 
org.apache.calcite.rel.rules.FilterJoinRule}.
- * NOTES: This file should be deleted when CALCITE-3170 is fixed.
+ * NOTES: This file should be deleted when CALCITE-3170 is fixed,
+ * and please make sure to synchronize with FlinkFilterJoinRule in flink 
planner when changing this class.
  * Modification:
- * - Handles the ON condition of anti-join can not be pushed down
+ * - Handles the ON condition of anti-join can not be pushed down, lines added 
(192-198)
  */
 
 /**
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
similarity index 99%
copy from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
copy to 
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index d260514..8d5388d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -117,8 +117,13 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 
 /**
- *  This class is copied from Apache Calcite except that it supports SEMI/ANTI 
join.
- *  NOTES: This file should be deleted when CALCITE-3169 and CALCITE-3170 are 
fixed.
+ * This class is copied from Apache Calcite except that it supports SEMI/ANTI 
join.
+ * NOTES: This 

Jenkins build is back to normal : flink-snapshot-deployment-1.9 #4

2019-07-18 Thread Apache Jenkins Server
See 




[flink] branch release-1.9 updated: [FLINK-13269][table] Copy RelDecorrelator & FlinkFilterJoinRule to flink planner to fix CALCITE-3169 & CALCITE-3170

2019-07-18 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 5bff822  [FLINK-13269][table] Copy RelDecorrelator & 
FlinkFilterJoinRule to flink planner to fix CALCITE-3169 & CALCITE-3170
5bff822 is described below

commit 5bff82205ee29e9ae82cd15588a5dd471c48
Author: godfreyhe 
AuthorDate: Tue Jul 16 21:54:13 2019 +0800

[FLINK-13269][table] Copy RelDecorrelator & FlinkFilterJoinRule to flink 
planner to fix CALCITE-3169 & CALCITE-3170

This closes #9122
---
 .../apache/calcite/sql2rel/RelDecorrelator.java| 14 ++
 .../plan/rules/logical/FlinkFilterJoinRule.java|  5 +++--
 .../apache/calcite/sql2rel/RelDecorrelator.java| 22 ++
 .../plan/rules/logical/FlinkFilterJoinRule.java|  5 +++--
 4 files changed, 30 insertions(+), 16 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index d260514..dedddfb 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -117,8 +117,13 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 
 /**
- *  This class is copied from Apache Calcite except that it supports SEMI/ANTI 
join.
- *  NOTES: This file should be deleted when CALCITE-3169 and CALCITE-3170 are 
fixed.
+ * This class is copied from Apache Calcite except that it supports SEMI/ANTI 
join.
+ * NOTES: This file should be deleted when CALCITE-3169 and CALCITE-3170 are 
fixed,
+ * and please make sure to synchronize with RelDecorrelator in flink planner 
when changing this class.
+ * Modification:
+ * 1. lines changed (249-251)
+ * 2. lines changed (271-278)
+ * 3. lines changed (1214-1215)
  */
 
 /**
@@ -241,8 +246,9 @@ public class RelDecorrelator implements ReflectiveVisitor {
 .addRuleInstance(new AdjustProjectForCountAggregateRule(false, f))
 .addRuleInstance(new AdjustProjectForCountAggregateRule(true, f))
 .addRuleInstance(
-new FilterJoinRule.FilterIntoJoinRule(true, f,
-FilterJoinRule.TRUE_PREDICATE))
+// use FilterJoinRule instead of FlinkFilterJoinRule while 
CALCITE-3170 is fixed
+new FlinkFilterJoinRule.FlinkFilterIntoJoinRule(true, f,
+FlinkFilterJoinRule.TRUE_PREDICATE))
 .addRuleInstance(
 new FilterProjectTransposeRule(Filter.class, Project.class, true,
 true, f))
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
index 3513ba7..084fc0e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/rules/logical/FlinkFilterJoinRule.java
@@ -42,9 +42,10 @@ import java.util.Objects;
 
 /**
  * This rules is copied from Calcite's {@link 
org.apache.calcite.rel.rules.FilterJoinRule}.
- * NOTES: This file should be deleted when CALCITE-3170 is fixed.
+ * NOTES: This file should be deleted when CALCITE-3170 is fixed,
+ * and please make sure to synchronize with FlinkFilterJoinRule in flink 
planner when changing this class.
  * Modification:
- * - Handles the ON condition of anti-join can not be pushed down
+ * - Handles the ON condition of anti-join can not be pushed down, lines added 
(192-198)
  */
 
 /**
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
similarity index 99%
copy from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
copy to 
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index d260514..8d5388d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -117,8 +117,13 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 
 /**
- *  This class is copied from Apache Calcite except that it supports SEMI/ANTI 
join.
- *  NOTES: This file should be deleted when CALCITE-3169 and CALCITE-3170 are 
fixed.
+ * This class is copied from Apache Calcite except that it supports SEMI/ANTI 
join.
+ *