[GitHub] [flink] yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-26 Thread GitBox
yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support 
TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#issuecomment-506202631
 
 
   In my opinion, If the checkpoint is timeout,  this checkpoint will be 
discarded. But the checkpoint coordinator would handle such abortion as usual 
and assume that the Flink job continues running. In fact, it looks like 
"running", but it doesn't process any records. In sync-checkpoint case, every 
task will be blocked after triggering the checkpoint. Unlocking only happens 
either when the task is cancelled or when the corresponding checkpoint is 
acknowledged. I guess stop-with-savepoint may have the same issue. I'm not sure 
if it will truly happen.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yumengz5 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-26 Thread GitBox
yumengz5 commented on issue #8617: [FLINK-12619][StateBackend]Support 
TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#issuecomment-506202631
 
 
   In my opinion, If the checkpoint is timeout,  this checkpoint will be 
discarded. But the checkpoint coordinator would handle such abortion as usual 
and assume that the Flink job continues running. In fact, it looks like 
"running", but it doesn't process any records. In sync-checkpoint case, every 
task will be blocked after triggering the checkpoint. Unlocking only happens 
either when the task is cancelled or when the corresponding checkpoint is 
acknowledged. I guess stop-with-savepoint may have the same issue. I'm not sure 
it will truly happen.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-26 Thread GitBox
wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix 
instability of BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#discussion_r298015077
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
 ##
 @@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+final class FileChannelMemoryMappedBoundedData implements BoundedData {
+
+   /** The file channel backing the memory mapped file. */
+   private final FileChannel fileChannel;
+
+   /** The reusable array with header buffer and data buffer, to use 
gathering writes on the
+* file channel ({@link 
java.nio.channels.GatheringByteChannel#write(ByteBuffer[])}). */
+   private final ByteBuffer[] headerAndBufferArray;
+
+   /** All memory mapped regions. */
+   private final ArrayList memoryMappedRegions;
+
+   /** The path of the memory mapped file. */
+   private final Path filePath;
+
+   /** The position in the file channel. Cached for efficiency, because an 
actual position
+* lookup in the channel involves various locks and checks. */
+   private long pos;
+
+   /** The position where the current memory mapped region must end. */
+   private long endOfCurrentRegion;
+
+   /** The position where the current memory mapped started. */
+   private long startOfCurrentRegion;
+
+   /** The maximum size of each mapped region. */
+   private final long maxRegionSize;
+
+   FileChannelMemoryMappedBoundedData(
+   Path filePath,
+   FileChannel fileChannel,
+   int maxSizePerMappedRegion) throws IOException {
+
+   this.filePath = filePath;
+   this.fileChannel = fileChannel;
+   this.headerAndBufferArray = 
BufferReaderWriterUtil.allocatedWriteBufferArray();
+   this.memoryMappedRegions = new ArrayList<>(4);
+   this.maxRegionSize = maxSizePerMappedRegion;
+   this.endOfCurrentRegion = maxSizePerMappedRegion;
+   }
+
+   @Override
+   public void writeBuffer(Buffer buffer) throws IOException {
+   if (tryWriteBuffer(buffer)) {
+   return;
+   }
+
+   mapRegionAndStartNext();
+
+   if (!tryWriteBuffer(buffer)) {
+   throwTooLargeBuffer(buffer);
+   }
+   }
+
+   private boolean tryWriteBuffer(Buffer buffer) throws IOException {
+   final long spaceLeft = endOfCurrentRegion - pos;
+   final long bytesWritten = 
BufferReaderWriterUtil.writeToByteChannelIfBelowSize(
+   fileChannel, buffer, headerAndBufferArray, 
spaceLeft);
+
+   if (bytesWritten >= 0) {
+   pos += bytesWritten;
+   return true;
+   }
+   else {
+   return false;
+   }
+   }
+
+   @Override
+   public BoundedData.Reader createReader() {
+   checkState(!fileChannel.isOpen());
+
+   final List buffers = memoryMappedRegions.stream()
+   .map((bb) -> 
bb.duplicate().order(ByteOrder.nativeOrder()))
+   .collect(Collectors.toList());
+
+   return new MemoryMappedBoundedData.BufferSlicer(buffers);
+   }
+
+   /**

[GitHub] [flink] wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-26 Thread GitBox
wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix 
instability of BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#discussion_r298015030
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
 ##
 @@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+final class FileChannelMemoryMappedBoundedData implements BoundedData {
+
+   /** The file channel backing the memory mapped file. */
+   private final FileChannel fileChannel;
+
+   /** The reusable array with header buffer and data buffer, to use 
gathering writes on the
+* file channel ({@link 
java.nio.channels.GatheringByteChannel#write(ByteBuffer[])}). */
+   private final ByteBuffer[] headerAndBufferArray;
+
+   /** All memory mapped regions. */
+   private final ArrayList memoryMappedRegions;
+
+   /** The path of the memory mapped file. */
+   private final Path filePath;
+
+   /** The position in the file channel. Cached for efficiency, because an 
actual position
+* lookup in the channel involves various locks and checks. */
+   private long pos;
+
+   /** The position where the current memory mapped region must end. */
+   private long endOfCurrentRegion;
+
+   /** The position where the current memory mapped started. */
+   private long startOfCurrentRegion;
+
+   /** The maximum size of each mapped region. */
+   private final long maxRegionSize;
+
+   FileChannelMemoryMappedBoundedData(
+   Path filePath,
+   FileChannel fileChannel,
+   int maxSizePerMappedRegion) throws IOException {
+
+   this.filePath = filePath;
+   this.fileChannel = fileChannel;
+   this.headerAndBufferArray = 
BufferReaderWriterUtil.allocatedWriteBufferArray();
+   this.memoryMappedRegions = new ArrayList<>(4);
+   this.maxRegionSize = maxSizePerMappedRegion;
+   this.endOfCurrentRegion = maxSizePerMappedRegion;
+   }
+
+   @Override
+   public void writeBuffer(Buffer buffer) throws IOException {
+   if (tryWriteBuffer(buffer)) {
+   return;
+   }
+
+   mapRegionAndStartNext();
+
+   if (!tryWriteBuffer(buffer)) {
+   throwTooLargeBuffer(buffer);
+   }
+   }
+
+   private boolean tryWriteBuffer(Buffer buffer) throws IOException {
+   final long spaceLeft = endOfCurrentRegion - pos;
+   final long bytesWritten = 
BufferReaderWriterUtil.writeToByteChannelIfBelowSize(
+   fileChannel, buffer, headerAndBufferArray, 
spaceLeft);
+
+   if (bytesWritten >= 0) {
+   pos += bytesWritten;
+   return true;
+   }
+   else {
+   return false;
+   }
+   }
+
+   @Override
+   public BoundedData.Reader createReader() {
+   checkState(!fileChannel.isOpen());
+
+   final List buffers = memoryMappedRegions.stream()
+   .map((bb) -> 
bb.duplicate().order(ByteOrder.nativeOrder()))
+   .collect(Collectors.toList());
+
+   return new MemoryMappedBoundedData.BufferSlicer(buffers);
+   }
+
+   /**

[jira] [Updated] (FLINK-11388) Add an OSS RecoverableWriter

2019-06-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11388:
---
Labels: pull-request-available  (was: )

> Add an OSS RecoverableWriter
> 
>
> Key: FLINK-11388
> URL: https://issues.apache.org/jira/browse/FLINK-11388
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Affects Versions: 1.7.1
>Reporter: wujinhu
>Assignee: wujinhu
>Priority: Major
>  Labels: pull-request-available
>
> OSS offers persistence only after uploads or multi-part uploads complete.  In 
> order to make streaming uses OSS as sink, we should implement a Recoverable 
> writer. This writer will snapshot and store multi-part upload information and 
> recover from those information when failure occurs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wujinhu closed pull request #7798: [FLINK-11388][fs] Add Aliyun OSS recoverable writer

2019-06-26 Thread GitBox
wujinhu closed pull request #7798: [FLINK-11388][fs] Add Aliyun OSS recoverable 
writer
URL: https://github.com/apache/flink/pull/7798
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8863: 
[FLINK-12962][python] Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r298012822
 
 

 ##
 File path: docs/flinkDev/building.md
 ##
 @@ -58,6 +58,17 @@ mvn clean install -DskipTests -Dfast
 
 The default build adds a Flink-specific JAR for Hadoop 2, to allow using Flink 
with HDFS and YARN.
 
+## Build PyFlink
+
+If you want to build a PyFlink package that can be used for pip installation, 
you need to build Flink jars first, as described in [Build Flink](##Build 
Flink).
+Then go to the root directory of flink source code and run this command to 
build a sdist package:
+
+{% highlight bash %}
+cd flink-python; python setup.py sdist
 
 Review comment:
   We also need `.whl`, right? `python setup.py sdist bdist_wheel`?
   detail can be found here: 
https://packaging.python.org/tutorials/packaging-projects/


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect all / minus all to blink planner

2019-06-26 Thread GitBox
JingsongLi commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect all / minus all to blink 
planner
URL: https://github.com/apache/flink/pull/8898#discussion_r298013117
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala
 ##
 @@ -55,4 +71,47 @@ abstract class ReplaceSetOpWithJoinRuleBase[T <: SetOp](
 }
 conditions
   }
+
+  protected def replicateRows(
 
 Review comment:
   I think `ReplaceSetOpWithJoinRuleBase` is useless, `generateCondition` is a 
static method. There could be a `SetOpRewriteUtil`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8863: 
[FLINK-12962][python] Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r298012822
 
 

 ##
 File path: docs/flinkDev/building.md
 ##
 @@ -58,6 +58,17 @@ mvn clean install -DskipTests -Dfast
 
 The default build adds a Flink-specific JAR for Hadoop 2, to allow using Flink 
with HDFS and YARN.
 
+## Build PyFlink
+
+If you want to build a PyFlink package that can be used for pip installation, 
you need to build Flink jars first, as described in [Build Flink](##Build 
Flink).
+Then go to the root directory of flink source code and run this command to 
build a sdist package:
+
+{% highlight bash %}
+cd flink-python; python setup.py sdist
 
 Review comment:
   We also need `.whl`, right? `python setup.py sdist bdist_wheel`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8863: 
[FLINK-12962][python] Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r298012822
 
 

 ##
 File path: docs/flinkDev/building.md
 ##
 @@ -58,6 +58,17 @@ mvn clean install -DskipTests -Dfast
 
 The default build adds a Flink-specific JAR for Hadoop 2, to allow using Flink 
with HDFS and YARN.
 
+## Build PyFlink
+
+If you want to build a PyFlink package that can be used for pip installation, 
you need to build Flink jars first, as described in [Build Flink](##Build 
Flink).
+Then go to the root directory of flink source code and run this command to 
build a sdist package:
+
+{% highlight bash %}
+cd flink-python; python setup.py sdist
 
 Review comment:
   We also need `.whl`, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12751) Create file based HA support

2019-06-26 Thread Yang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873830#comment-16873830
 ] 

Yang Wang commented on FLINK-12751:
---

[~borisl] Sorry for the late.

Just as you say, job manager selection is done by k8s deployment of 1. However, 
if the kubelet crashed, the pod running on it may not be terminated. So two job 
managers may be running at the same time. We need to use the etcd/zookeeper to 
do the job manager selection.

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8863: 
[FLINK-12962][python] Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r298012822
 
 

 ##
 File path: docs/flinkDev/building.md
 ##
 @@ -58,6 +58,17 @@ mvn clean install -DskipTests -Dfast
 
 The default build adds a Flink-specific JAR for Hadoop 2, to allow using Flink 
with HDFS and YARN.
 
+## Build PyFlink
+
+If you want to build a PyFlink package that can be used for pip installation, 
you need to build Flink jars first, as described in [Build Flink](##Build 
Flink).
+Then go to the root directory of flink source code and run this command to 
build a sdist package:
+
+{% highlight bash %}
+cd flink-python; python setup.py sdist
 
 Review comment:
   We alos need `.whl`, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-26 Thread GitBox
wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix 
instability of BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#discussion_r298012847
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
 ##
 @@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+final class FileChannelMemoryMappedBoundedData implements BoundedData {
+
+   /** The file channel backing the memory mapped file. */
+   private final FileChannel fileChannel;
+
+   /** The reusable array with header buffer and data buffer, to use 
gathering writes on the
+* file channel ({@link 
java.nio.channels.GatheringByteChannel#write(ByteBuffer[])}). */
+   private final ByteBuffer[] headerAndBufferArray;
+
+   /** All memory mapped regions. */
+   private final ArrayList memoryMappedRegions;
+
+   /** The path of the memory mapped file. */
+   private final Path filePath;
+
+   /** The position in the file channel. Cached for efficiency, because an 
actual position
+* lookup in the channel involves various locks and checks. */
+   private long pos;
+
+   /** The position where the current memory mapped region must end. */
+   private long endOfCurrentRegion;
+
+   /** The position where the current memory mapped started. */
+   private long startOfCurrentRegion;
+
+   /** The maximum size of each mapped region. */
+   private final long maxRegionSize;
+
+   FileChannelMemoryMappedBoundedData(
+   Path filePath,
+   FileChannel fileChannel,
+   int maxSizePerMappedRegion) throws IOException {
+
+   this.filePath = filePath;
+   this.fileChannel = fileChannel;
+   this.headerAndBufferArray = 
BufferReaderWriterUtil.allocatedWriteBufferArray();
+   this.memoryMappedRegions = new ArrayList<>(4);
+   this.maxRegionSize = maxSizePerMappedRegion;
+   this.endOfCurrentRegion = maxSizePerMappedRegion;
+   }
+
+   @Override
+   public void writeBuffer(Buffer buffer) throws IOException {
+   if (tryWriteBuffer(buffer)) {
+   return;
+   }
+
+   mapRegionAndStartNext();
+
+   if (!tryWriteBuffer(buffer)) {
+   throwTooLargeBuffer(buffer);
+   }
+   }
+
+   private boolean tryWriteBuffer(Buffer buffer) throws IOException {
+   final long spaceLeft = endOfCurrentRegion - pos;
+   final long bytesWritten = 
BufferReaderWriterUtil.writeToByteChannelIfBelowSize(
+   fileChannel, buffer, headerAndBufferArray, 
spaceLeft);
+
+   if (bytesWritten >= 0) {
+   pos += bytesWritten;
+   return true;
+   }
+   else {
 
 Review comment:
   else can be removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13011) Release the PyFlink into PyPI

2019-06-26 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-13011:

Description: 
FLINK-12962 adds the ability to build a PyFlink distribution package, but we 
have not yet released PyFlink to PyPI. The goal of JIRA is to publish the 
PyFlinjk distribution package built by FLINK-12962 to PyPI. 

https://pypi.org/
https://packaging.python.org/tutorials/packaging-projects/

 

  was:
FLINK-12962 adds the ability to build a PyFlink distribution package, but we 
have not yet released PyFlink to PyPI. The goal of JIRA is to publish the 
PyFlinjk distribution package built by FLINK-12962 to PyPI. https://pypi.org/

 


> Release the PyFlink into PyPI
> -
>
> Key: FLINK-13011
> URL: https://issues.apache.org/jira/browse/FLINK-13011
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Build System
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Priority: Major
>
> FLINK-12962 adds the ability to build a PyFlink distribution package, but we 
> have not yet released PyFlink to PyPI. The goal of JIRA is to publish the 
> PyFlinjk distribution package built by FLINK-12962 to PyPI. 
> https://pypi.org/
> https://packaging.python.org/tutorials/packaging-projects/
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12999) Can't generate valid execution plan for "SELECT uuid() FROM (VALUES(1,2,3)) T(a, b, c)"

2019-06-26 Thread Zhenghua Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-12999:
-
Summary: Can't generate valid execution plan for "SELECT uuid() FROM 
(VALUES(1,2,3)) T(a, b, c)"  (was: Can't generate valid execution plan for 
"SELECT uuid() FROM VALUES(1,2,3) T(a, b, c)")

> Can't generate valid execution plan for "SELECT uuid() FROM (VALUES(1,2,3)) 
> T(a, b, c)"
> ---
>
> Key: FLINK-12999
> URL: https://issues.apache.org/jira/browse/FLINK-12999
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Zhenghua Gao
>Assignee: godfrey he
>Priority: Major
>
> The ERROR message is: 
>  
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query:
> LogicalSink(fields=[EXPR$0])
> +- LogicalProject(EXPR$0=[UUID()])
>  +- LogicalValues(tuples=[[\{ 1, 2, 3 }]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> at 
> org.apache.flink.table.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72)
>  at 
> org.apache.flink.table.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>  at 
> org.apache.flink.table.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>  at 
> org.apache.flink.table.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>  at 
> org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:82)
>  at 
> org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:51)
>  at 
> org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:39)
>  at 
> org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:39)
>  at scala.collection.immutable.List.foreach(List.scala:392)
>  at 
> org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:39)
>  at 
> org.apache.flink.table.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:65)
>  at 
> org.apache.flink.table.api.TableEnvironment.optimize(TableEnvironment.scala:251)
>  at 
> org.apache.flink.table.api.TableEnvironment.compileToExecNodePlan(TableEnvironment.scala:200)
>  at 
> org.apache.flink.table.api.TableEnvironment.compile(TableEnvironment.scala:184)
>  at 
> org.apache.flink.table.api.TableEnvironment.generateStreamGraph(TableEnvironment.scala:155)
>  at 
> org.apache.flink.table.api.BatchTableEnvironment.execute(BatchTableEnvironment.scala:93)
>  at 
> org.apache.flink.table.api.TableEnvironment.execute(TableEnvironment.scala:136)
>  at 
> org.apache.flink.table.runtime.utils.BatchTableEnvUtil$.collect(BatchTableEnvUtil.scala:55)
>  at 
> org.apache.flink.table.runtime.utils.TableUtil$.collectSink(TableUtil.scala:60)
>  at 
> org.apache.flink.table.runtime.utils.TableUtil$.collect(TableUtil.scala:41)
>  at 
> org.apache.flink.table.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:308)
>  at 
> org.apache.flink.table.runtime.utils.BatchTestBase.check(BatchTestBase.scala:164)
>  at 
> org.apache.flink.table.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:103)
>  at 
> org.apache.flink.table.runtime.batch.sql.ValuesITCase.test(ValuesITCase.scala:38)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> 

[jira] [Updated] (FLINK-13011) Release the PyFlink into PyPI

2019-06-26 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-13011:

Description: 
FLINK-12962 adds the ability to build a PyFlink distribution package, but we 
have not yet released PyFlink to PyPI. The goal of JIRA is to publish the 
PyFlinjk distribution package built by FLINK-12962 to PyPI. https://pypi.org/

 

  was:
FLINK-12962 adds the ability to build a PyFlink distribution package, but we 
have not yet released PyFlink to PyPI. The goal of JIRA is to publish the 
PyFlinjk distribution package built by FLINK-12962 to PyPI.

 


> Release the PyFlink into PyPI
> -
>
> Key: FLINK-13011
> URL: https://issues.apache.org/jira/browse/FLINK-13011
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Build System
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Priority: Major
>
> FLINK-12962 adds the ability to build a PyFlink distribution package, but we 
> have not yet released PyFlink to PyPI. The goal of JIRA is to publish the 
> PyFlinjk distribution package built by FLINK-12962 to PyPI. https://pypi.org/
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12882) Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters

2019-06-26 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12882:
-
Description: The ResultPartitionID could be got directly from 
ResultPartitionDeploymentDescriptor, so it is no need to pass 
ExecutionAttemptID to construct new ResultPartitionID during creating 
ResultPartition in factory.  (was: The {{ExecutionAttemptID}} is only used for 
constructing {{ResultPartitionID}} during creating {{ResultPartitionWriters in 
ShuffleEnvironment}}.

Actually the {{ResultPartitionID}} could be got directly from 
{{ResultPartitionDeploymentDescriptor}} via 
{{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this 
field in the interface to make it simple.)

> Remove ExecutionAttemptID field from 
> ShuffleEnvironment#createResultPartitionWriters
> 
>
> Key: FLINK-12882
> URL: https://issues.apache.org/jira/browse/FLINK-12882
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The ResultPartitionID could be got directly from 
> ResultPartitionDeploymentDescriptor, so it is no need to pass 
> ExecutionAttemptID to construct new ResultPartitionID during creating 
> ResultPartition in factory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12882) Remove ExecutionAttemptID argument from ResultPartitionFactory#create

2019-06-26 Thread zhijiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-12882:
-
Summary: Remove ExecutionAttemptID argument from 
ResultPartitionFactory#create  (was: Remove ExecutionAttemptID field from 
ShuffleEnvironment#createResultPartitionWriters)

> Remove ExecutionAttemptID argument from ResultPartitionFactory#create
> -
>
> Key: FLINK-12882
> URL: https://issues.apache.org/jira/browse/FLINK-12882
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The ResultPartitionID could be got directly from 
> ResultPartitionDeploymentDescriptor, so it is no need to pass 
> ExecutionAttemptID to construct new ResultPartitionID during creating 
> ResultPartition in factory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] godfreyhe commented on issue #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on issue #8898: [FLINK-12936][table-planner-blink] Support 
intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#issuecomment-506192171
 
 
   update commit message as `Support intersect all/minus all in blink planner` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting 
Started - Table API Example Walkthrough
URL: https://github.com/apache/flink/pull/8903#discussion_r298004677
 
 

 ##
 File path: docs/getting-started/tutorials/table_api.md
 ##
 @@ -0,0 +1,250 @@
+---
+title: "Table API"
+nav-id: tableapitutorials
+nav-title: 'Table API'
+nav-parent_id: apitutorials
+nav-pos: 1
+---
+
+
+Apache Flink offers a Table API as a unified, relational API for batch and 
stream processing, i.e., queries are executed with the same semantics on 
unbounded, real-time streams or bounded, recorded streams and produce the same 
results.
+The Table API in Flink is commonly used to ease the definition of data 
analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are We Building? 
+
+In this tutorial, we'll show how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
+We will start by building our report as a nightly batch job, and then migrate 
to a streaming pipeline to see how batch is just a special case of streaming. 
+
+## Prerequisites
+
+We'll assume that you have some familiarity with Java or Scala, but you should 
be able to follow along even if you're coming from a different programming 
language.
+We'll also assume that you're familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses. 
+
+If you want to follow along you will require a computer with: 
+
+* Java 8 
+* Maven 
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly. 
+
+## Setting up a Maven Project
+
+We are going to use a Flink Maven Archetype for creating our project structure.
+
+{% highlight bash %}
+$ mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink \
+-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %}
+
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+-DarchetypeVersion={{ site.version }} \
+-DgroupId=spend-report \
+-DartifactId=spend-report \
+-Dversion=0.1 \
+-Dpackage=spendreport \
+-DinteractiveMode=false
+{% endhighlight %}
+
+{% unless site.is_stable %}
+
+Note: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the commandline. If you wish to use the 
snapshot repository, you need to add a repository entry to your settings.xml. 
For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven
 official document
+
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
+Maven will create a project with all the dependencies to complete this 
tutorial.
+After importing the project in your editor you will see a file following code. 
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new TransactionTableSource());
+tEnv.registerTableSink("stdout", new StdOutTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+   .scan("transactions")
+   .insertInto("stdout");
+
+env.execute("Spend Report");
+{% endhighlight %}
+
+Let's break down this code by component. 
+
+## Breaking Down The Code
+
+ The Execution Environment
+
+The first two lines set up our `ExecutionEnvironment`.
+The execution environment is how we set properties for our deployments, 
specify whether we are writing a batch or streaming application, and create our 
sources.
+Here we have chosen to use the batch environment since we are building a 
periodic batch report.
+We then wrap it in a `BatchTableEnvironment` so to have full access to the 
Table Api.
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+
+ Registering Tables
+
+Next, we register tables that we can use for the input and output of our 
application. 
+The TableEnvironment maintains a catalog of tables that are registered by 
name. There are two types of tables, input tables and output tables.
+Input tables can be referenced in Table API and SQL queries and provide input 
data.
+Output tables can be used to emit the result of a Table API or SQL query to an 
external system.
+Tables can support batch queries, streaming queries, or both. 
+
+{% highlight java %}
+tEnv.registerTableSource("transactions", new 

[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting 
Started - Table API Example Walkthrough
URL: https://github.com/apache/flink/pull/8903#discussion_r298003160
 
 

 ##
 File path: docs/getting-started/tutorials/table_api.md
 ##
 @@ -0,0 +1,250 @@
+---
+title: "Table API"
+nav-id: tableapitutorials
+nav-title: 'Table API'
+nav-parent_id: apitutorials
+nav-pos: 1
+---
+
+
+Apache Flink offers a Table API as a unified, relational API for batch and 
stream processing, i.e., queries are executed with the same semantics on 
unbounded, real-time streams or bounded, recorded streams and produce the same 
results.
+The Table API in Flink is commonly used to ease the definition of data 
analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are We Building? 
+
+In this tutorial, we'll show how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
+We will start by building our report as a nightly batch job, and then migrate 
to a streaming pipeline to see how batch is just a special case of streaming. 
+
+## Prerequisites
+
+We'll assume that you have some familiarity with Java or Scala, but you should 
be able to follow along even if you're coming from a different programming 
language.
+We'll also assume that you're familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses. 
+
+If you want to follow along you will require a computer with: 
+
+* Java 8 
+* Maven 
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly. 
+
+## Setting up a Maven Project
+
+We are going to use a Flink Maven Archetype for creating our project structure.
+
+{% highlight bash %}
+$ mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink \
+-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %}
+
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+-DarchetypeVersion={{ site.version }} \
+-DgroupId=spend-report \
+-DartifactId=spend-report \
+-Dversion=0.1 \
+-Dpackage=spendreport \
+-DinteractiveMode=false
+{% endhighlight %}
+
+{% unless site.is_stable %}
+
+Note: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the commandline. If you wish to use the 
snapshot repository, you need to add a repository entry to your settings.xml. 
For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven
 official document
+
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
+Maven will create a project with all the dependencies to complete this 
tutorial.
+After importing the project in your editor you will see a file following code. 
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new TransactionTableSource());
+tEnv.registerTableSink("stdout", new StdOutTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+   .scan("transactions")
+   .insertInto("stdout");
+
+env.execute("Spend Report");
+{% endhighlight %}
+
+Let's break down this code by component. 
+
+## Breaking Down The Code
+
+ The Execution Environment
+
+The first two lines set up our `ExecutionEnvironment`.
+The execution environment is how we set properties for our deployments, 
specify whether we are writing a batch or streaming application, and create our 
sources.
+Here we have chosen to use the batch environment since we are building a 
periodic batch report.
+We then wrap it in a `BatchTableEnvironment` so to have full access to the 
Table Api.
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+
+ Registering Tables
+
+Next, we register tables that we can use for the input and output of our 
application. 
+The TableEnvironment maintains a catalog of tables that are registered by 
name. There are two types of tables, input tables and output tables.
+Input tables can be referenced in Table API and SQL queries and provide input 
data.
+Output tables can be used to emit the result of a Table API or SQL query to an 
external system.
+Tables can support batch queries, streaming queries, or both. 
+
+{% highlight java %}
+tEnv.registerTableSource("transactions", new 

[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting 
Started - Table API Example Walkthrough
URL: https://github.com/apache/flink/pull/8903#discussion_r298005605
 
 

 ##
 File path: docs/getting-started/tutorials/table_api.md
 ##
 @@ -0,0 +1,250 @@
+---
+title: "Table API"
+nav-id: tableapitutorials
+nav-title: 'Table API'
+nav-parent_id: apitutorials
+nav-pos: 1
+---
+
+
+Apache Flink offers a Table API as a unified, relational API for batch and 
stream processing, i.e., queries are executed with the same semantics on 
unbounded, real-time streams or bounded, recorded streams and produce the same 
results.
+The Table API in Flink is commonly used to ease the definition of data 
analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are We Building? 
+
+In this tutorial, we'll show how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
+We will start by building our report as a nightly batch job, and then migrate 
to a streaming pipeline to see how batch is just a special case of streaming. 
+
+## Prerequisites
+
+We'll assume that you have some familiarity with Java or Scala, but you should 
be able to follow along even if you're coming from a different programming 
language.
+We'll also assume that you're familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses. 
+
+If you want to follow along you will require a computer with: 
+
+* Java 8 
+* Maven 
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly. 
+
+## Setting up a Maven Project
+
+We are going to use a Flink Maven Archetype for creating our project structure.
+
+{% highlight bash %}
+$ mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink \
+-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %}
+
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+-DarchetypeVersion={{ site.version }} \
+-DgroupId=spend-report \
+-DartifactId=spend-report \
+-Dversion=0.1 \
+-Dpackage=spendreport \
+-DinteractiveMode=false
+{% endhighlight %}
+
+{% unless site.is_stable %}
+
+Note: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the commandline. If you wish to use the 
snapshot repository, you need to add a repository entry to your settings.xml. 
For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven
 official document
+
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
+Maven will create a project with all the dependencies to complete this 
tutorial.
+After importing the project in your editor you will see a file following code. 
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new TransactionTableSource());
+tEnv.registerTableSink("stdout", new StdOutTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+   .scan("transactions")
+   .insertInto("stdout");
+
+env.execute("Spend Report");
+{% endhighlight %}
+
+Let's break down this code by component. 
+
+## Breaking Down The Code
+
+ The Execution Environment
+
+The first two lines set up our `ExecutionEnvironment`.
+The execution environment is how we set properties for our deployments, 
specify whether we are writing a batch or streaming application, and create our 
sources.
+Here we have chosen to use the batch environment since we are building a 
periodic batch report.
+We then wrap it in a `BatchTableEnvironment` so to have full access to the 
Table Api.
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+
+ Registering Tables
+
+Next, we register tables that we can use for the input and output of our 
application. 
+The TableEnvironment maintains a catalog of tables that are registered by 
name. There are two types of tables, input tables and output tables.
+Input tables can be referenced in Table API and SQL queries and provide input 
data.
+Output tables can be used to emit the result of a Table API or SQL query to an 
external system.
+Tables can support batch queries, streaming queries, or both. 
+
+{% highlight java %}
+tEnv.registerTableSource("transactions", new 

[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting 
Started - Table API Example Walkthrough
URL: https://github.com/apache/flink/pull/8903#discussion_r298002892
 
 

 ##
 File path: docs/getting-started/tutorials/table_api.md
 ##
 @@ -0,0 +1,250 @@
+---
+title: "Table API"
+nav-id: tableapitutorials
+nav-title: 'Table API'
+nav-parent_id: apitutorials
+nav-pos: 1
+---
+
+
+Apache Flink offers a Table API as a unified, relational API for batch and 
stream processing, i.e., queries are executed with the same semantics on 
unbounded, real-time streams or bounded, recorded streams and produce the same 
results.
+The Table API in Flink is commonly used to ease the definition of data 
analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are We Building? 
+
+In this tutorial, we'll show how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
+We will start by building our report as a nightly batch job, and then migrate 
to a streaming pipeline to see how batch is just a special case of streaming. 
+
+## Prerequisites
+
+We'll assume that you have some familiarity with Java or Scala, but you should 
be able to follow along even if you're coming from a different programming 
language.
+We'll also assume that you're familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses. 
+
+If you want to follow along you will require a computer with: 
+
+* Java 8 
+* Maven 
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly. 
+
+## Setting up a Maven Project
+
+We are going to use a Flink Maven Archetype for creating our project structure.
+
+{% highlight bash %}
+$ mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink \
+-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %}
+
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+-DarchetypeVersion={{ site.version }} \
+-DgroupId=spend-report \
+-DartifactId=spend-report \
+-Dversion=0.1 \
+-Dpackage=spendreport \
+-DinteractiveMode=false
+{% endhighlight %}
+
+{% unless site.is_stable %}
+
+Note: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the commandline. If you wish to use the 
snapshot repository, you need to add a repository entry to your settings.xml. 
For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven
 official document
+
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
+Maven will create a project with all the dependencies to complete this 
tutorial.
+After importing the project in your editor you will see a file following code. 
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new TransactionTableSource());
+tEnv.registerTableSink("stdout", new StdOutTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+   .scan("transactions")
+   .insertInto("stdout");
+
+env.execute("Spend Report");
+{% endhighlight %}
+
+Let's break down this code by component. 
+
+## Breaking Down The Code
+
+ The Execution Environment
+
+The first two lines set up our `ExecutionEnvironment`.
+The execution environment is how we set properties for our deployments, 
specify whether we are writing a batch or streaming application, and create our 
sources.
+Here we have chosen to use the batch environment since we are building a 
periodic batch report.
+We then wrap it in a `BatchTableEnvironment` so to have full access to the 
Table Api.
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+
+ Registering Tables
+
+Next, we register tables that we can use for the input and output of our 
application. 
+The TableEnvironment maintains a catalog of tables that are registered by 
name. There are two types of tables, input tables and output tables.
+Input tables can be referenced in Table API and SQL queries and provide input 
data.
+Output tables can be used to emit the result of a Table API or SQL query to an 
external system.
+Tables can support batch queries, streaming queries, or both. 
+
+{% highlight java %}
 
 Review comment:
   We should also add Scala here, shouldn't we?


[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting 
Started - Table API Example Walkthrough
URL: https://github.com/apache/flink/pull/8903#discussion_r298004640
 
 

 ##
 File path: docs/getting-started/tutorials/table_api.md
 ##
 @@ -0,0 +1,250 @@
+---
+title: "Table API"
+nav-id: tableapitutorials
+nav-title: 'Table API'
+nav-parent_id: apitutorials
+nav-pos: 1
+---
+
+
+Apache Flink offers a Table API as a unified, relational API for batch and 
stream processing, i.e., queries are executed with the same semantics on 
unbounded, real-time streams or bounded, recorded streams and produce the same 
results.
+The Table API in Flink is commonly used to ease the definition of data 
analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are We Building? 
+
+In this tutorial, we'll show how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
+We will start by building our report as a nightly batch job, and then migrate 
to a streaming pipeline to see how batch is just a special case of streaming. 
+
+## Prerequisites
+
+We'll assume that you have some familiarity with Java or Scala, but you should 
be able to follow along even if you're coming from a different programming 
language.
+We'll also assume that you're familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses. 
+
+If you want to follow along you will require a computer with: 
+
+* Java 8 
+* Maven 
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user 
mailing](https://flink.apache.org/community.html#mailing-lists) list is 
consistently rated as one of the most active of any Apache project and a great 
way to get help quickly. 
+
+## Setting up a Maven Project
+
+We are going to use a Flink Maven Archetype for creating our project structure.
+
+{% highlight bash %}
+$ mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink \
+-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %}
+
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+-DarchetypeVersion={{ site.version }} \
+-DgroupId=spend-report \
+-DartifactId=spend-report \
+-Dversion=0.1 \
+-Dpackage=spendreport \
+-DinteractiveMode=false
+{% endhighlight %}
+
+{% unless site.is_stable %}
+
+Note: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the commandline. If you wish to use the 
snapshot repository, you need to add a repository entry to your settings.xml. 
For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven
 official document
+
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
+Maven will create a project with all the dependencies to complete this 
tutorial.
+After importing the project in your editor you will see a file following code. 
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new TransactionTableSource());
+tEnv.registerTableSink("stdout", new StdOutTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+   .scan("transactions")
+   .insertInto("stdout");
+
+env.execute("Spend Report");
+{% endhighlight %}
+
+Let's break down this code by component. 
+
+## Breaking Down The Code
+
+ The Execution Environment
+
+The first two lines set up our `ExecutionEnvironment`.
+The execution environment is how we set properties for our deployments, 
specify whether we are writing a batch or streaming application, and create our 
sources.
+Here we have chosen to use the batch environment since we are building a 
periodic batch report.
+We then wrap it in a `BatchTableEnvironment` so to have full access to the 
Table Api.
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+
+ Registering Tables
+
+Next, we register tables that we can use for the input and output of our 
application. 
+The TableEnvironment maintains a catalog of tables that are registered by 
name. There are two types of tables, input tables and output tables.
+Input tables can be referenced in Table API and SQL queries and provide input 
data.
+Output tables can be used to emit the result of a Table API or SQL query to an 
external system.
+Tables can support batch queries, streaming queries, or both. 
+
+{% highlight java %}
+tEnv.registerTableSource("transactions", new TransactionTableSource());

[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting 
Started - Table API Example Walkthrough
URL: https://github.com/apache/flink/pull/8903#discussion_r298003884
 
 

 ##
 File path: docs/getting-started/tutorials/table_api.md
 ##
 @@ -0,0 +1,250 @@
+---
+title: "Table API"
+nav-id: tableapitutorials
+nav-title: 'Table API'
+nav-parent_id: apitutorials
+nav-pos: 1
+---
+
+
+Apache Flink offers a Table API as a unified, relational API for batch and 
stream processing, i.e., queries are executed with the same semantics on 
unbounded, real-time streams or bounded, recorded streams and produce the same 
results.
+The Table API in Flink is commonly used to ease the definition of data 
analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are We Building? 
+
+In this tutorial, we'll show how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
+We will start by building our report as a nightly batch job, and then migrate 
to a streaming pipeline to see how batch is just a special case of streaming. 
+
+## Prerequisites
+
+We'll assume that you have some familiarity with Java or Scala, but you should 
be able to follow along even if you're coming from a different programming 
language.
+We'll also assume that you're familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses. 
+
+If you want to follow along you will require a computer with: 
+
+* Java 8 
+* Maven 
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly. 
+
+## Setting up a Maven Project
+
+We are going to use a Flink Maven Archetype for creating our project structure.
+
+{% highlight bash %}
+$ mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink \
+-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %}
+
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+-DarchetypeVersion={{ site.version }} \
+-DgroupId=spend-report \
+-DartifactId=spend-report \
+-Dversion=0.1 \
+-Dpackage=spendreport \
+-DinteractiveMode=false
+{% endhighlight %}
+
+{% unless site.is_stable %}
+
+Note: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the commandline. If you wish to use the 
snapshot repository, you need to add a repository entry to your settings.xml. 
For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven
 official document
+
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
+Maven will create a project with all the dependencies to complete this 
tutorial.
+After importing the project in your editor you will see a file following code. 
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new TransactionTableSource());
+tEnv.registerTableSink("stdout", new StdOutTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+   .scan("transactions")
+   .insertInto("stdout");
+
+env.execute("Spend Report");
+{% endhighlight %}
+
+Let's break down this code by component. 
+
+## Breaking Down The Code
+
+ The Execution Environment
+
+The first two lines set up our `ExecutionEnvironment`.
+The execution environment is how we set properties for our deployments, 
specify whether we are writing a batch or streaming application, and create our 
sources.
+Here we have chosen to use the batch environment since we are building a 
periodic batch report.
+We then wrap it in a `BatchTableEnvironment` so to have full access to the 
Table Api.
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+
+ Registering Tables
+
+Next, we register tables that we can use for the input and output of our 
application. 
+The TableEnvironment maintains a catalog of tables that are registered by 
name. There are two types of tables, input tables and output tables.
+Input tables can be referenced in Table API and SQL queries and provide input 
data.
+Output tables can be used to emit the result of a Table API or SQL query to an 
external system.
+Tables can support batch queries, streaming queries, or both. 
+
+{% highlight java %}
+tEnv.registerTableSource("transactions", new 

[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting 
Started - Table API Example Walkthrough
URL: https://github.com/apache/flink/pull/8903#discussion_r298004962
 
 

 ##
 File path: docs/getting-started/tutorials/table_api.md
 ##
 @@ -0,0 +1,250 @@
+---
+title: "Table API"
+nav-id: tableapitutorials
+nav-title: 'Table API'
+nav-parent_id: apitutorials
+nav-pos: 1
+---
+
+
+Apache Flink offers a Table API as a unified, relational API for batch and 
stream processing, i.e., queries are executed with the same semantics on 
unbounded, real-time streams or bounded, recorded streams and produce the same 
results.
+The Table API in Flink is commonly used to ease the definition of data 
analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are We Building? 
+
+In this tutorial, we'll show how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
+We will start by building our report as a nightly batch job, and then migrate 
to a streaming pipeline to see how batch is just a special case of streaming. 
+
+## Prerequisites
+
+We'll assume that you have some familiarity with Java or Scala, but you should 
be able to follow along even if you're coming from a different programming 
language.
+We'll also assume that you're familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses. 
+
+If you want to follow along you will require a computer with: 
+
+* Java 8 
+* Maven 
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly. 
+
+## Setting up a Maven Project
+
+We are going to use a Flink Maven Archetype for creating our project structure.
+
+{% highlight bash %}
+$ mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink \
+-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %}
+
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+-DarchetypeVersion={{ site.version }} \
+-DgroupId=spend-report \
+-DartifactId=spend-report \
+-Dversion=0.1 \
+-Dpackage=spendreport \
+-DinteractiveMode=false
+{% endhighlight %}
+
+{% unless site.is_stable %}
+
+Note: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the commandline. If you wish to use the 
snapshot repository, you need to add a repository entry to your settings.xml. 
For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven
 official document
+
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
+Maven will create a project with all the dependencies to complete this 
tutorial.
+After importing the project in your editor you will see a file following code. 
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new TransactionTableSource());
+tEnv.registerTableSink("stdout", new StdOutTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+   .scan("transactions")
+   .insertInto("stdout");
+
+env.execute("Spend Report");
+{% endhighlight %}
+
+Let's break down this code by component. 
+
+## Breaking Down The Code
+
+ The Execution Environment
+
+The first two lines set up our `ExecutionEnvironment`.
+The execution environment is how we set properties for our deployments, 
specify whether we are writing a batch or streaming application, and create our 
sources.
+Here we have chosen to use the batch environment since we are building a 
periodic batch report.
+We then wrap it in a `BatchTableEnvironment` so to have full access to the 
Table Api.
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+
+ Registering Tables
+
+Next, we register tables that we can use for the input and output of our 
application. 
+The TableEnvironment maintains a catalog of tables that are registered by 
name. There are two types of tables, input tables and output tables.
+Input tables can be referenced in Table API and SQL queries and provide input 
data.
+Output tables can be used to emit the result of a Table API or SQL query to an 
external system.
+Tables can support batch queries, streaming queries, or both. 
+
+{% highlight java %}
+tEnv.registerTableSource("transactions", new 

[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting 
Started - Table API Example Walkthrough
URL: https://github.com/apache/flink/pull/8903#discussion_r298003032
 
 

 ##
 File path: docs/getting-started/tutorials/table_api.md
 ##
 @@ -0,0 +1,250 @@
+---
+title: "Table API"
+nav-id: tableapitutorials
+nav-title: 'Table API'
+nav-parent_id: apitutorials
+nav-pos: 1
+---
+
+
+Apache Flink offers a Table API as a unified, relational API for batch and 
stream processing, i.e., queries are executed with the same semantics on 
unbounded, real-time streams or bounded, recorded streams and produce the same 
results.
+The Table API in Flink is commonly used to ease the definition of data 
analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are We Building? 
+
+In this tutorial, we'll show how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
+We will start by building our report as a nightly batch job, and then migrate 
to a streaming pipeline to see how batch is just a special case of streaming. 
+
+## Prerequisites
+
+We'll assume that you have some familiarity with Java or Scala, but you should 
be able to follow along even if you're coming from a different programming 
language.
+We'll also assume that you're familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses. 
+
+If you want to follow along you will require a computer with: 
+
+* Java 8 
+* Maven 
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly. 
+
+## Setting up a Maven Project
+
+We are going to use a Flink Maven Archetype for creating our project structure.
+
+{% highlight bash %}
+$ mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink \
+-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %}
+
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+-DarchetypeVersion={{ site.version }} \
+-DgroupId=spend-report \
+-DartifactId=spend-report \
+-Dversion=0.1 \
+-Dpackage=spendreport \
+-DinteractiveMode=false
+{% endhighlight %}
+
+{% unless site.is_stable %}
+
+Note: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the commandline. If you wish to use the 
snapshot repository, you need to add a repository entry to your settings.xml. 
For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven
 official document
+
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
+Maven will create a project with all the dependencies to complete this 
tutorial.
+After importing the project in your editor you will see a file following code. 
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new TransactionTableSource());
+tEnv.registerTableSink("stdout", new StdOutTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+   .scan("transactions")
+   .insertInto("stdout");
+
+env.execute("Spend Report");
+{% endhighlight %}
+
+Let's break down this code by component. 
+
+## Breaking Down The Code
+
+ The Execution Environment
+
+The first two lines set up our `ExecutionEnvironment`.
+The execution environment is how we set properties for our deployments, 
specify whether we are writing a batch or streaming application, and create our 
sources.
+Here we have chosen to use the batch environment since we are building a 
periodic batch report.
+We then wrap it in a `BatchTableEnvironment` so to have full access to the 
Table Api.
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+
+ Registering Tables
+
+Next, we register tables that we can use for the input and output of our 
application. 
+The TableEnvironment maintains a catalog of tables that are registered by 
name. There are two types of tables, input tables and output tables.
+Input tables can be referenced in Table API and SQL queries and provide input 
data.
 
 Review comment:
   Maybe mention that this corresponds to "sinks" and "sources" in other 
frameworks/APIs.


This is an automated message from the Apache Git Service.
To respond to the 

[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting 
Started - Table API Example Walkthrough
URL: https://github.com/apache/flink/pull/8903#discussion_r298003359
 
 

 ##
 File path: docs/getting-started/tutorials/table_api.md
 ##
 @@ -0,0 +1,250 @@
+---
+title: "Table API"
+nav-id: tableapitutorials
+nav-title: 'Table API'
+nav-parent_id: apitutorials
+nav-pos: 1
+---
+
+
+Apache Flink offers a Table API as a unified, relational API for batch and 
stream processing, i.e., queries are executed with the same semantics on 
unbounded, real-time streams or bounded, recorded streams and produce the same 
results.
+The Table API in Flink is commonly used to ease the definition of data 
analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are We Building? 
+
+In this tutorial, we'll show how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
+We will start by building our report as a nightly batch job, and then migrate 
to a streaming pipeline to see how batch is just a special case of streaming. 
+
+## Prerequisites
+
+We'll assume that you have some familiarity with Java or Scala, but you should 
be able to follow along even if you're coming from a different programming 
language.
+We'll also assume that you're familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses. 
+
+If you want to follow along you will require a computer with: 
+
+* Java 8 
+* Maven 
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly. 
+
+## Setting up a Maven Project
+
+We are going to use a Flink Maven Archetype for creating our project structure.
+
+{% highlight bash %}
+$ mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink \
+-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %}
+
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+-DarchetypeVersion={{ site.version }} \
+-DgroupId=spend-report \
+-DartifactId=spend-report \
+-Dversion=0.1 \
+-Dpackage=spendreport \
+-DinteractiveMode=false
+{% endhighlight %}
+
+{% unless site.is_stable %}
+
+Note: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the commandline. If you wish to use the 
snapshot repository, you need to add a repository entry to your settings.xml. 
For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven
 official document
+
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
+Maven will create a project with all the dependencies to complete this 
tutorial.
+After importing the project in your editor you will see a file following code. 
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new TransactionTableSource());
+tEnv.registerTableSink("stdout", new StdOutTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+   .scan("transactions")
+   .insertInto("stdout");
+
+env.execute("Spend Report");
+{% endhighlight %}
+
+Let's break down this code by component. 
+
+## Breaking Down The Code
+
+ The Execution Environment
+
+The first two lines set up our `ExecutionEnvironment`.
+The execution environment is how we set properties for our deployments, 
specify whether we are writing a batch or streaming application, and create our 
sources.
+Here we have chosen to use the batch environment since we are building a 
periodic batch report.
+We then wrap it in a `BatchTableEnvironment` so to have full access to the 
Table Api.
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+
+ Registering Tables
+
+Next, we register tables that we can use for the input and output of our 
application. 
+The TableEnvironment maintains a catalog of tables that are registered by 
name. There are two types of tables, input tables and output tables.
+Input tables can be referenced in Table API and SQL queries and provide input 
data.
+Output tables can be used to emit the result of a Table API or SQL query to an 
external system.
+Tables can support batch queries, streaming queries, or both. 
+
+{% highlight java %}
+tEnv.registerTableSource("transactions", new 

[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting 
Started - Table API Example Walkthrough
URL: https://github.com/apache/flink/pull/8903#discussion_r298004832
 
 

 ##
 File path: docs/getting-started/tutorials/table_api.md
 ##
 @@ -0,0 +1,250 @@
+---
+title: "Table API"
+nav-id: tableapitutorials
+nav-title: 'Table API'
+nav-parent_id: apitutorials
+nav-pos: 1
+---
+
+
+Apache Flink offers a Table API as a unified, relational API for batch and 
stream processing, i.e., queries are executed with the same semantics on 
unbounded, real-time streams or bounded, recorded streams and produce the same 
results.
+The Table API in Flink is commonly used to ease the definition of data 
analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are We Building? 
+
+In this tutorial, we'll show how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
+We will start by building our report as a nightly batch job, and then migrate 
to a streaming pipeline to see how batch is just a special case of streaming. 
+
+## Prerequisites
+
+We'll assume that you have some familiarity with Java or Scala, but you should 
be able to follow along even if you're coming from a different programming 
language.
+We'll also assume that you're familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses. 
+
+If you want to follow along you will require a computer with: 
+
+* Java 8 
+* Maven 
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly. 
+
+## Setting up a Maven Project
+
+We are going to use a Flink Maven Archetype for creating our project structure.
+
+{% highlight bash %}
+$ mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink \
+-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %}
+
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+-DarchetypeVersion={{ site.version }} \
+-DgroupId=spend-report \
+-DartifactId=spend-report \
+-Dversion=0.1 \
+-Dpackage=spendreport \
+-DinteractiveMode=false
+{% endhighlight %}
+
+{% unless site.is_stable %}
+
+Note: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the commandline. If you wish to use the 
snapshot repository, you need to add a repository entry to your settings.xml. 
For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven
 official document
+
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
+Maven will create a project with all the dependencies to complete this 
tutorial.
+After importing the project in your editor you will see a file following code. 
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new TransactionTableSource());
+tEnv.registerTableSink("stdout", new StdOutTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+   .scan("transactions")
+   .insertInto("stdout");
+
+env.execute("Spend Report");
+{% endhighlight %}
+
+Let's break down this code by component. 
+
+## Breaking Down The Code
+
+ The Execution Environment
+
+The first two lines set up our `ExecutionEnvironment`.
+The execution environment is how we set properties for our deployments, 
specify whether we are writing a batch or streaming application, and create our 
sources.
+Here we have chosen to use the batch environment since we are building a 
periodic batch report.
+We then wrap it in a `BatchTableEnvironment` so to have full access to the 
Table Api.
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+
+ Registering Tables
+
+Next, we register tables that we can use for the input and output of our 
application. 
+The TableEnvironment maintains a catalog of tables that are registered by 
name. There are two types of tables, input tables and output tables.
+Input tables can be referenced in Table API and SQL queries and provide input 
data.
+Output tables can be used to emit the result of a Table API or SQL query to an 
external system.
+Tables can support batch queries, streaming queries, or both. 
+
+{% highlight java %}
+tEnv.registerTableSource("transactions", new 

[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough

2019-06-26 Thread GitBox
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting 
Started - Table API Example Walkthrough
URL: https://github.com/apache/flink/pull/8903#discussion_r298003620
 
 

 ##
 File path: docs/getting-started/tutorials/table_api.md
 ##
 @@ -0,0 +1,250 @@
+---
+title: "Table API"
+nav-id: tableapitutorials
+nav-title: 'Table API'
+nav-parent_id: apitutorials
+nav-pos: 1
+---
+
+
+Apache Flink offers a Table API as a unified, relational API for batch and 
stream processing, i.e., queries are executed with the same semantics on 
unbounded, real-time streams or bounded, recorded streams and produce the same 
results.
+The Table API in Flink is commonly used to ease the definition of data 
analytics, data pipelining, and ETL applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## What Are We Building? 
+
+In this tutorial, we'll show how to build a continuous ETL pipeline for 
tracking financial transactions by account over time.
+We will start by building our report as a nightly batch job, and then migrate 
to a streaming pipeline to see how batch is just a special case of streaming. 
+
+## Prerequisites
+
+We'll assume that you have some familiarity with Java or Scala, but you should 
be able to follow along even if you're coming from a different programming 
language.
+We'll also assume that you're familiar with basic relational concepts such as 
`SELECT` and `GROUP BY` clauses. 
+
+If you want to follow along you will require a computer with: 
+
+* Java 8 
+* Maven 
+
+## Help, I’m Stuck! 
+
+If you get stuck, check out the [community support 
resources](https://flink.apache.org/community.html).
+In particular, Apache Flink's [user mailing 
list](https://flink.apache.org/community.html#mailing-lists) is consistently 
ranked as one of the most active of any Apache project and a great way to get 
help quickly. 
+
+## Setting up a Maven Project
+
+We are going to use a Flink Maven Archetype for creating our project structure.
+
+{% highlight bash %}
+$ mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink \
+-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %}
+
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/
 \{% endunless %}
+-DarchetypeVersion={{ site.version }} \
+-DgroupId=spend-report \
+-DartifactId=spend-report \
+-Dversion=0.1 \
+-Dpackage=spendreport \
+-DinteractiveMode=false
+{% endhighlight %}
+
+{% unless site.is_stable %}
+
+Note: For Maven 3.0 or higher, it is no longer possible to specify 
the repository (-DarchetypeCatalog) via the commandline. If you wish to use the 
snapshot repository, you need to add a repository entry to your settings.xml. 
For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven
 official document
+
+{% endunless %}
+
+You can edit the `groupId`, `artifactId` and `package` if you like. With the 
above parameters,
+Maven will create a project with all the dependencies to complete this 
tutorial.
+After importing the project in your editor you will see a file following code. 
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+
+tEnv.registerTableSource("transactions", new TransactionTableSource());
+tEnv.registerTableSink("stdout", new StdOutTableSink());
+tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
+
+tEnv
+   .scan("transactions")
+   .insertInto("stdout");
+
+env.execute("Spend Report");
+{% endhighlight %}
+
+Let's break down this code by component. 
+
+## Breaking Down The Code
+
+ The Execution Environment
+
+The first two lines set up our `ExecutionEnvironment`.
+The execution environment is how we set properties for our deployments, 
specify whether we are writing a batch or streaming application, and create our 
sources.
+Here we have chosen to use the batch environment since we are building a 
periodic batch report.
+We then wrap it in a `BatchTableEnvironment` so to have full access to the 
Table Api.
+
+{% highlight java %}
+ExecutionEnvironment env   = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
+{% endhighlight %}
+
+ Registering Tables
+
+Next, we register tables that we can use for the input and output of our 
application. 
+The TableEnvironment maintains a catalog of tables that are registered by 
name. There are two types of tables, input tables and output tables.
+Input tables can be referenced in Table API and SQL queries and provide input 
data.
+Output tables can be used to emit the result of a Table API or SQL query to an 
external system.
+Tables can support batch queries, streaming queries, or both. 
+
+{% highlight java %}
+tEnv.registerTableSource("transactions", new 

[jira] [Commented] (FLINK-13004) Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState

2019-06-26 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873815#comment-16873815
 ] 

Yun Tang commented on FLINK-13004:
--

[~jark], actually, the {{timestamp}} passed in was wrapper {{Long.}} For the 
comparison of wrapper and primitive value problem, you could refer to [question 
here|https://stackoverflow.com/questions/19485818/what-are-not-2-long-variables-equal-with-operator-to-compare-in-java].

I am preparing a PR to fix this and found previous tests have been modified to 
cater for this bug.

> Correct the logic of needToCleanupState in 
> KeyedProcessFunctionWithCleanupState
> ---
>
> Key: FLINK-13004
> URL: https://issues.apache.org/jira/browse/FLINK-13004
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>
> Current implementation of needToCleanupState in 
> KeyedProcessFunctionWithCleanupState actually has potention bug:
> {code:java}
> protected Boolean needToCleanupState(Long timestamp) throws IOException {
>if (stateCleaningEnabled) {
>   Long cleanupTime = cleanupTimeState.value();
>   // check that the triggered timer is the last registered processing 
> time timer.
>   return null != cleanupTime && timestamp == cleanupTime;
>} else {
>   return false;
>}
> }
> {code}
> Please note that it directly use "==" to judge whether *Long* type timestamp 
> and cleanupTime equals. However, if that value is larger than 127L, the 
> result would actually return false instead of wanted true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297967960
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import 
org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, 
GREATER_THAN_OR_EQUAL, IF}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Intersect
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Intersect]] operator using a combination of Union, 
Aggregate
+  * and Generate operator.
+  *
+  * Input Query :
+  * {{{
+  *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
+  * }}}
+  *
+  * Rewritten Query:
+  * {{{
+  *   SELECT c1
+  *   FROM (
+  *SELECT replicate_row(min_count, c1)
 
 Review comment:
   UDTF (`replicate_row`) can not be in `select` clause, please use `LATERAL 
TABLE(replicate_row(XXX))`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297985745
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteMinusAll.scala
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.GREATER_THAN
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Minus
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Minus]] operator using a combination of Union, Aggregate
+  * and Generate operator.
+  *
+  * Input Query :
+  * {{{
+  *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2
+  * }}}
+  *
+  * Rewritten Query:
+  * {{{
+  *   SELECT c1
+  *   FROM (
+  * SELECT replicate_rows(sum_val, c1)
+  *   FROM (
+  * SELECT c1, sum_val
+  *   FROM (
+  * SELECT c1, sum(vcol) AS sum_val
+  *   FROM (
+  * SELECT c1, 1L as vcol FROM ut1
+  * UNION ALL
+  * SELECT c1, -1L as vcol FROM ut2
+  *  ) AS union_all
+  *GROUP BY union_all.c1
+  *  )
+  *WHERE sum_val > 0
+  *   )
+  *   )
+  * }}}
+  */
+class RewriteMinusAll extends ReplaceSetOpWithJoinRuleBase(
+  classOf[Minus], "RewriteMinusAll") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val minus: Minus = call.rel(0)
+!minus.isDistinct
 
 Review comment:
   `minus.all`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r298000417
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml
 ##
 @@ -40,6 +40,34 @@ GroupAggregate(groupBy=[c], select=[c])
   +- Exchange(distribution=[hash[f]])
  +- Calc(select=[f])
 +- TableSourceScan(table=[[default_catalog, default_database, T2, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+]]>
+
+  
+  
 
 Review comment:
   this case should be in second commit ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297973779
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import 
org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, 
GREATER_THAN_OR_EQUAL, IF}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Intersect
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Intersect]] operator using a combination of Union, 
Aggregate
+  * and Generate operator.
+  *
+  * Input Query :
+  * {{{
+  *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
+  * }}}
+  *
+  * Rewritten Query:
+  * {{{
+  *   SELECT c1
+  *   FROM (
+  *SELECT replicate_row(min_count, c1)
+  *FROM (
+  * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS 
min_count
+  * FROM (
+  *  SELECT   c1, count(vcol1) as vcol1_cnt, count(vcol2) as 
vcol2_cnt
+  *  FROM (
+  *   SELECT c1, true as vcol1, null as vcol2 FROM ut1
+  *   UNION ALL
+  *   SELECT c1, null as vcol1, true as vcol2 FROM ut2
+  *   ) AS union_all
+  *  GROUP BY c1
+  *  HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
+  *  )
+  * )
+  * )
+  * }}}
+  */
+class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase(
+  classOf[Intersect], "RewriteIntersectAll") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val intersect: Intersect = call.rel(0)
+!intersect.isDistinct
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val intersect: Intersect = call.rel(0)
+val left = intersect.getInput(0)
+val right = intersect.getInput(1)
+
+val fields = Util.range(intersect.getRowType.getFieldCount)
+
+val builder1 = call.builder
 
 Review comment:
   builder1 => leftBuilder ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297962213
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
 ##
 @@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, 
LONG_TYPE_INFO, STRING_TYPE_INFO}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableConfigOptions
+import org.apache.flink.table.runtime.batch.sql.join.JoinITCaseHelper
+import org.apache.flink.table.runtime.batch.sql.join.JoinType.{JoinType, _}
+import org.apache.flink.table.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.runtime.utils.TestData._
+import org.apache.flink.table.runtime.utils.{BatchScalaTableEnvUtil, 
BatchTableEnvUtil, BatchTestBase}
+
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Before, Test}
+
+import java.util
+
+import scala.util.Random
+
+@RunWith(classOf[Parameterized])
+class SetOperatorsITCase(joinType: JoinType) extends BatchTestBase {
+
+  @Before
+  def before(): Unit = {
+
tEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM,
 3)
+registerCollection("AllNullTable3", allNullData3, type3, "a, b, c")
+registerCollection("SmallTable3", smallData3, type3, "a, b, c")
+registerCollection("Table3", data3, type3, "a, b, c")
+registerCollection("Table5", data5, type5, "a, b, c, d, e")
+JoinITCaseHelper.disableOtherJoinOpForJoin(tEnv, joinType)
+  }
+
+  @Test
+  def testIntersect(): Unit = {
+val data = List(
+  row(1, 1L, "Hi"),
+  row(2, 2L, "Hello"),
+  row(2, 2L, "Hello"),
+  row(3, 2L, "Hello world!")
+)
+val shuffleData = Random.shuffle(data)
+BatchTableEnvUtil.registerCollection(
+  tEnv,
+  "T",
+  shuffleData,
+  new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO),
+  "a, b, c")
+
+checkResult(
+  "SELECT c FROM SmallTable3 INTERSECT SELECT c FROM T",
+  Seq(row("Hi"), row("Hello")))
+  }
+
+  @Test
+  def testIntersectWithFilter(): Unit = {
+checkResult(
+  "SELECT c FROM ((SELECT * FROM SmallTable3) INTERSECT (SELECT * FROM 
Table3)) WHERE a > 1",
+  Seq(row("Hello"), row("Hello world")))
+  }
+
+  @Test
+  def testExcept(): Unit = {
+val data = List(row(1, 1L, "Hi"))
+BatchTableEnvUtil.registerCollection(
+  tEnv,
+  "T", data,
+  new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO),
+  "a, b, c")
+
+checkResult(
+  "SELECT c FROM SmallTable3 EXCEPT (SELECT c FROM T)",
+  Seq(row("Hello"), row("Hello world")))
+  }
+
+  @Test
+  def testExceptWithFilter(): Unit = {
+checkResult(
+  "SELECT c FROM (" +
+  "SELECT * FROM SmallTable3 EXCEPT (SELECT a, b, d FROM Table5))" +
+  "WHERE b < 2",
+  Seq(row("Hi")))
+  }
+
+  @Test
+  def testIntersectWithNulls(): Unit = {
+checkResult(
+  "SELECT c FROM AllNullTable3 INTERSECT SELECT c FROM AllNullTable3",
+  Seq(row(null)))
+  }
+
+  @Test
+  def testExceptWithNulls(): Unit = {
+checkResult(
+  "SELECT c FROM AllNullTable3 EXCEPT SELECT c FROM AllNullTable3",
+  Seq())
+  }
+
+  @Test
+  def testIntersectAll(): Unit = {
+BatchScalaTableEnvUtil.registerCollection(tEnv, "T1", Seq(1, 1, 1, 2, 2), 
"c")
+BatchScalaTableEnvUtil.registerCollection(tEnv, "T2", Seq(1, 2, 2, 2, 3), 
"c")
+checkResult(
+  "SELECT c FROM T1 INTERSECT ALL SELECT c FROM T2",
+  Seq(row(1), row(2), row(2)))
+  }
+
+  @Test
+  def testMinusAll(): Unit = {
+BatchScalaTableEnvUtil.registerCollection(tEnv, "T2", Seq((1, 1L, "Hi")), 
"a, b, c")
+val t1 = "SELECT * FROM SmallTable3"
+val t2 = "SELECT * FROM T2"
+checkResult(
+  s"SELECT c FROM (($t1 UNION ALL $t1 UNION ALL $t1) EXCEPT ALL ($t2 UNION 
ALL $t2))",
+  Seq(
+

[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297963859
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala
 ##
 @@ -55,4 +71,47 @@ abstract class ReplaceSetOpWithJoinRuleBase[T <: SetOp](
 }
 conditions
   }
+
+  protected def replicateRows(
+  builder: RelBuilder, outputType: RelDataType, fields: 
util.List[Integer]): RelNode = {
+// construct LogicalTableFunctionScan
+val logicalType = toLogicalRowType(outputType)
+val fieldNames = outputType.getFieldNames.toSeq.toArray
+val fieldTypes = toLogicalRowType(outputType)
+.getChildren.map(fromLogicalTypeToTypeInfo).toArray
+val tf = new ReplicateRows(fieldTypes)
+val resultType = fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, 
fieldNames))
+val function = new TypedFlinkTableFunction(tf, resultType)
+val typeFactory = builder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+val sqlFunction = new TableSqlFunction(
+  tf.functionIdentifier,
+  tf.toString,
+  tf,
+  resultType,
+  typeFactory,
+  function)
+
+val scan = LogicalTableFunctionScan.create(
+  builder.peek().getCluster,
 
 Review comment:
   create a `cluster` variable ? as `builder.peek().getCluster` is used twice


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297962835
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala
 ##
 @@ -55,4 +71,47 @@ abstract class ReplaceSetOpWithJoinRuleBase[T <: SetOp](
 }
 conditions
   }
+
+  protected def replicateRows(
+  builder: RelBuilder, outputType: RelDataType, fields: 
util.List[Integer]): RelNode = {
+// construct LogicalTableFunctionScan
+val logicalType = toLogicalRowType(outputType)
+val fieldNames = outputType.getFieldNames.toSeq.toArray
+val fieldTypes = toLogicalRowType(outputType)
 
 Review comment:
   `toLogicalRowType(outputType)` is duplicate, use `logicalType` defined before


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297964789
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import 
org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, 
GREATER_THAN_OR_EQUAL, IF}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Intersect
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Intersect]] operator using a combination of Union, 
Aggregate
+  * and Generate operator.
+  *
+  * Input Query :
 
 Review comment:
   Original Query ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297966548
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import 
org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, 
GREATER_THAN_OR_EQUAL, IF}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Intersect
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Intersect]] operator using a combination of Union, 
Aggregate
+  * and Generate operator.
+  *
+  * Input Query :
+  * {{{
+  *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
+  * }}}
+  *
+  * Rewritten Query:
+  * {{{
+  *   SELECT c1
+  *   FROM (
+  *SELECT replicate_row(min_count, c1)
+  *FROM (
+  * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS 
min_count
+  * FROM (
+  *  SELECT   c1, count(vcol1) as vcol1_cnt, count(vcol2) as 
vcol2_cnt
+  *  FROM (
+  *   SELECT c1, true as vcol1, null as vcol2 FROM ut1
+  *   UNION ALL
+  *   SELECT c1, null as vcol1, true as vcol2 FROM ut2
+  *   ) AS union_all
+  *  GROUP BY c1
+  *  HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
 
 Review comment:
   use `count(vcol1)` instead of `vcol1_cnt`, same as `vcol2_cnt`
   or `vcol1_cnt >= 1 AND vcol2_cnt >= 1` can be used in a `where` clause


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297962742
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala
 ##
 @@ -55,4 +71,47 @@ abstract class ReplaceSetOpWithJoinRuleBase[T <: SetOp](
 }
 conditions
   }
+
+  protected def replicateRows(
 
 Review comment:
   the class name `ReplaceSetOpWithJoinRuleBase` is not proper now, because 
`RewriteIntersectAll` and `RewriteMinusAll` will not rewrite `IntersectAll` and 
`MinusAll` to a join
   
   please add javadoc for this method


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297964429
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import 
org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, 
GREATER_THAN_OR_EQUAL, IF}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Intersect
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Intersect]] operator using a combination of Union, 
Aggregate
+  * and Generate operator.
 
 Review comment:
   what's the meaning of `Generate operator` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297974855
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import 
org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, 
GREATER_THAN_OR_EQUAL, IF}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Intersect
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Intersect]] operator using a combination of Union, 
Aggregate
+  * and Generate operator.
+  *
+  * Input Query :
+  * {{{
+  *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
+  * }}}
+  *
+  * Rewritten Query:
+  * {{{
+  *   SELECT c1
+  *   FROM (
+  *SELECT replicate_row(min_count, c1)
+  *FROM (
+  * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS 
min_count
+  * FROM (
+  *  SELECT   c1, count(vcol1) as vcol1_cnt, count(vcol2) as 
vcol2_cnt
+  *  FROM (
+  *   SELECT c1, true as vcol1, null as vcol2 FROM ut1
+  *   UNION ALL
+  *   SELECT c1, null as vcol1, true as vcol2 FROM ut2
+  *   ) AS union_all
+  *  GROUP BY c1
+  *  HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
+  *  )
+  * )
+  * )
+  * }}}
+  */
+class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase(
+  classOf[Intersect], "RewriteIntersectAll") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val intersect: Intersect = call.rel(0)
+!intersect.isDistinct
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val intersect: Intersect = call.rel(0)
+val left = intersect.getInput(0)
+val right = intersect.getInput(1)
+
+val fields = Util.range(intersect.getRowType.getFieldCount)
+
+val builder1 = call.builder
+val boolType = builder1.getTypeFactory.createSqlType(SqlTypeName.BOOLEAN)
+val leftWithAddedVirtualCols = builder1
 
 Review comment:
   leftWithAddedVirtualCols => leftWithMarker ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297985301
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteMinusAll.scala
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.GREATER_THAN
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Minus
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Minus]] operator using a combination of Union, Aggregate
+  * and Generate operator.
 
 Review comment:
   same as above


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297969993
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import 
org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, 
GREATER_THAN_OR_EQUAL, IF}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Intersect
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Intersect]] operator using a combination of Union, 
Aggregate
+  * and Generate operator.
+  *
+  * Input Query :
+  * {{{
+  *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
+  * }}}
+  *
+  * Rewritten Query:
+  * {{{
+  *   SELECT c1
+  *   FROM (
+  *SELECT replicate_row(min_count, c1)
+  *FROM (
+  * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS 
min_count
+  * FROM (
+  *  SELECT   c1, count(vcol1) as vcol1_cnt, count(vcol2) as 
vcol2_cnt
+  *  FROM (
+  *   SELECT c1, true as vcol1, null as vcol2 FROM ut1
+  *   UNION ALL
+  *   SELECT c1, null as vcol1, true as vcol2 FROM ut2
+  *   ) AS union_all
+  *  GROUP BY c1
+  *  HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
+  *  )
+  * )
+  * )
+  * }}}
+  */
+class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase(
+  classOf[Intersect], "RewriteIntersectAll") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val intersect: Intersect = call.rel(0)
+!intersect.isDistinct
 
 Review comment:
   `intersect.all`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297973264
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import 
org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, 
GREATER_THAN_OR_EQUAL, IF}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Intersect
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Intersect]] operator using a combination of Union, 
Aggregate
+  * and Generate operator.
+  *
+  * Input Query :
+  * {{{
+  *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
+  * }}}
+  *
+  * Rewritten Query:
+  * {{{
+  *   SELECT c1
+  *   FROM (
+  *SELECT replicate_row(min_count, c1)
+  *FROM (
+  * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS 
min_count
+  * FROM (
+  *  SELECT   c1, count(vcol1) as vcol1_cnt, count(vcol2) as 
vcol2_cnt
+  *  FROM (
+  *   SELECT c1, true as vcol1, null as vcol2 FROM ut1
+  *   UNION ALL
+  *   SELECT c1, null as vcol1, true as vcol2 FROM ut2
+  *   ) AS union_all
+  *  GROUP BY c1
+  *  HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
+  *  )
+  * )
+  * )
+  * }}}
+  */
+class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase(
+  classOf[Intersect], "RewriteIntersectAll") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val intersect: Intersect = call.rel(0)
+!intersect.isDistinct
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val intersect: Intersect = call.rel(0)
+val left = intersect.getInput(0)
+val right = intersect.getInput(1)
+
+val fields = Util.range(intersect.getRowType.getFieldCount)
+
+val builder1 = call.builder
+val boolType = builder1.getTypeFactory.createSqlType(SqlTypeName.BOOLEAN)
+val leftWithAddedVirtualCols = builder1
+.push(left)
+.project(
+  builder1.fields(fields) ++ Seq(
+builder1.alias(builder1.literal(true), "vcol1"),
 
 Review comment:
   please choose a more meaningful name, such as `left_marker` ? 
   so does `vcol2`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297974008
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import 
org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, 
GREATER_THAN_OR_EQUAL, IF}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Intersect
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Intersect]] operator using a combination of Union, 
Aggregate
+  * and Generate operator.
+  *
+  * Input Query :
+  * {{{
+  *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
+  * }}}
+  *
+  * Rewritten Query:
+  * {{{
+  *   SELECT c1
+  *   FROM (
+  *SELECT replicate_row(min_count, c1)
+  *FROM (
+  * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS 
min_count
+  * FROM (
+  *  SELECT   c1, count(vcol1) as vcol1_cnt, count(vcol2) as 
vcol2_cnt
+  *  FROM (
+  *   SELECT c1, true as vcol1, null as vcol2 FROM ut1
+  *   UNION ALL
+  *   SELECT c1, null as vcol1, true as vcol2 FROM ut2
+  *   ) AS union_all
+  *  GROUP BY c1
+  *  HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
+  *  )
+  * )
+  * )
+  * }}}
+  */
+class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase(
+  classOf[Intersect], "RewriteIntersectAll") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val intersect: Intersect = call.rel(0)
+!intersect.isDistinct
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val intersect: Intersect = call.rel(0)
+val left = intersect.getInput(0)
+val right = intersect.getInput(1)
+
+val fields = Util.range(intersect.getRowType.getFieldCount)
+
+val builder1 = call.builder
+val boolType = builder1.getTypeFactory.createSqlType(SqlTypeName.BOOLEAN)
+val leftWithAddedVirtualCols = builder1
+.push(left)
+.project(
+  builder1.fields(fields) ++ Seq(
+builder1.alias(builder1.literal(true), "vcol1"),
+builder1.alias(builder1.getRexBuilder.makeNullLiteral(boolType), 
"vcol2")))
+.build()
+
+val builder2 = call.builder
 
 Review comment:
   builder2 => rightBuilder ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297963892
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala
 ##
 @@ -55,4 +71,47 @@ abstract class ReplaceSetOpWithJoinRuleBase[T <: SetOp](
 }
 conditions
   }
+
+  protected def replicateRows(
+  builder: RelBuilder, outputType: RelDataType, fields: 
util.List[Integer]): RelNode = {
+// construct LogicalTableFunctionScan
+val logicalType = toLogicalRowType(outputType)
+val fieldNames = outputType.getFieldNames.toSeq.toArray
+val fieldTypes = toLogicalRowType(outputType)
+.getChildren.map(fromLogicalTypeToTypeInfo).toArray
+val tf = new ReplicateRows(fieldTypes)
+val resultType = fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, 
fieldNames))
+val function = new TypedFlinkTableFunction(tf, resultType)
+val typeFactory = builder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+val sqlFunction = new TableSqlFunction(
+  tf.functionIdentifier,
+  tf.toString,
+  tf,
+  resultType,
+  typeFactory,
+  function)
+
+val scan = LogicalTableFunctionScan.create(
+  builder.peek().getCluster,
+  new util.ArrayList[RelNode](),
+  builder.call(
+sqlFunction,
+builder.fields(Util.range(fields.size() + 1))),
+  function.getElementType(null),
+  UserDefinedFunctionUtils.buildRelDataType(
+builder.getTypeFactory,
+logicalType,
+fieldNames,
+fieldNames.indices.toArray),
+  null)
+builder.push(scan)
+
+// join
 
 Review comment:
   join => correlate


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297999767
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import 
org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, 
GREATER_THAN_OR_EQUAL, IF}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Intersect
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Intersect]] operator using a combination of Union, 
Aggregate
+  * and Generate operator.
+  *
+  * Input Query :
+  * {{{
+  *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
+  * }}}
+  *
+  * Rewritten Query:
+  * {{{
+  *   SELECT c1
+  *   FROM (
+  *SELECT replicate_row(min_count, c1)
+  *FROM (
+  * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS 
min_count
+  * FROM (
+  *  SELECT   c1, count(vcol1) as vcol1_cnt, count(vcol2) as 
vcol2_cnt
+  *  FROM (
+  *   SELECT c1, true as vcol1, null as vcol2 FROM ut1
+  *   UNION ALL
+  *   SELECT c1, null as vcol1, true as vcol2 FROM ut2
+  *   ) AS union_all
+  *  GROUP BY c1
+  *  HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
+  *  )
+  * )
+  * )
+  * }}}
+  */
+class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase(
+  classOf[Intersect], "RewriteIntersectAll") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val intersect: Intersect = call.rel(0)
+!intersect.isDistinct
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val intersect: Intersect = call.rel(0)
 
 Review comment:
   `Intersect` and `Minus` may have more than two inputs, and that case could 
not be supported now


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] knaufk commented on issue #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough

2019-06-26 Thread GitBox
knaufk commented on issue #8903: [FLINK-12747][docs] Getting Started - Table 
API Example Walkthrough
URL: https://github.com/apache/flink/pull/8903#issuecomment-506153339
 
 
   Thanks a lot @sjwiesman  for the contribution! A few general thoughts: 
   
   * I really like that it's not the typical generic "wordcount"/"windowed 
wordcount"/etc.
   
   * I would be in favor of simply adding this as an example to 
`flink-examples-table` without a maven archtype. Firstly, we already have 
quickstart modules to create program skeletons. Adding an additional option for 
the walkthroughs adds extra burden in terms of ongoing maintenance and might 
confuse users. Secondly, in my opinion a "code walkthrough" should focus on 
explainoing the code of an example so that I can easily follow it. I don't 
necessarily expect the utilities to bootstrap a new program here. (We should 
have a separate dedicated subsection in the "Getting Started" section for the 
quickstarts/project setup.)
   
   * Personally I think, the example for a code walkthrough should be no-magic 
and I should be able to understand as much as possible as a new user. Here this 
means to go without the custom `TableSource`, which is not easy to understand 
and it is not clear why this can be a streaming and batch source at the same 
time etc.. Alternatively, one could simply read from a file of transactions 
(batch case) or read from a message queue (streaming case; maybe provide a 
minimal data generator). This would also go well with the future vision of 
"simply replace the source and your job turns into a streaming program", 
although we are not there yet.
   
   * Do we also want to add SQL to this example (probably as a section in the 
end implementing the same business logic)? Currently, we don't plan to have 
dedicated "SQL Walkthrough". Do we need an SQL walkthrough at all?
   
   What do you think? What do others think? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297975145
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import 
org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, 
GREATER_THAN_OR_EQUAL, IF}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Intersect
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Intersect]] operator using a combination of Union, 
Aggregate
+  * and Generate operator.
+  *
+  * Input Query :
+  * {{{
+  *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
+  * }}}
+  *
+  * Rewritten Query:
+  * {{{
+  *   SELECT c1
+  *   FROM (
+  *SELECT replicate_row(min_count, c1)
+  *FROM (
+  * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS 
min_count
+  * FROM (
+  *  SELECT   c1, count(vcol1) as vcol1_cnt, count(vcol2) as 
vcol2_cnt
+  *  FROM (
+  *   SELECT c1, true as vcol1, null as vcol2 FROM ut1
+  *   UNION ALL
+  *   SELECT c1, null as vcol1, true as vcol2 FROM ut2
+  *   ) AS union_all
+  *  GROUP BY c1
+  *  HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
+  *  )
+  * )
+  * )
+  * }}}
+  */
+class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase(
+  classOf[Intersect], "RewriteIntersectAll") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val intersect: Intersect = call.rel(0)
+!intersect.isDistinct
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val intersect: Intersect = call.rel(0)
+val left = intersect.getInput(0)
+val right = intersect.getInput(1)
+
+val fields = Util.range(intersect.getRowType.getFieldCount)
+
+val builder1 = call.builder
+val boolType = builder1.getTypeFactory.createSqlType(SqlTypeName.BOOLEAN)
+val leftWithAddedVirtualCols = builder1
+.push(left)
+.project(
+  builder1.fields(fields) ++ Seq(
+builder1.alias(builder1.literal(true), "vcol1"),
+builder1.alias(builder1.getRexBuilder.makeNullLiteral(boolType), 
"vcol2")))
+.build()
+
+val builder2 = call.builder
+val rightWithAddedVirtualCols = builder2
+.push(right)
+.project(
+  builder2.fields(fields) ++ Seq(
+builder2.alias(builder2.getRexBuilder.makeNullLiteral(boolType), 
"vcol1"),
+builder2.alias(builder2.literal(true), "vcol2")))
+.build()
+
+val builder = call.builder
 
 Review comment:
   please add some comments for each step


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner

2019-06-26 Thread GitBox
godfreyhe commented on a change in pull request #8898: 
[FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
URL: https://github.com/apache/flink/pull/8898#discussion_r297984695
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import 
org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, 
GREATER_THAN_OR_EQUAL, IF}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.core.Intersect
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.util.Util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Replaces logical [[Intersect]] operator using a combination of Union, 
Aggregate
+  * and Generate operator.
+  *
+  * Input Query :
+  * {{{
+  *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
+  * }}}
+  *
+  * Rewritten Query:
+  * {{{
+  *   SELECT c1
+  *   FROM (
+  *SELECT replicate_row(min_count, c1)
+  *FROM (
+  * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS 
min_count
+  * FROM (
+  *  SELECT   c1, count(vcol1) as vcol1_cnt, count(vcol2) as 
vcol2_cnt
+  *  FROM (
+  *   SELECT c1, true as vcol1, null as vcol2 FROM ut1
+  *   UNION ALL
+  *   SELECT c1, null as vcol1, true as vcol2 FROM ut2
+  *   ) AS union_all
+  *  GROUP BY c1
+  *  HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
+  *  )
+  * )
+  * )
+  * }}}
+  */
+class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase(
+  classOf[Intersect], "RewriteIntersectAll") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val intersect: Intersect = call.rel(0)
+!intersect.isDistinct
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val intersect: Intersect = call.rel(0)
+val left = intersect.getInput(0)
+val right = intersect.getInput(1)
+
+val fields = Util.range(intersect.getRowType.getFieldCount)
+
+val builder1 = call.builder
+val boolType = builder1.getTypeFactory.createSqlType(SqlTypeName.BOOLEAN)
+val leftWithAddedVirtualCols = builder1
+.push(left)
+.project(
+  builder1.fields(fields) ++ Seq(
+builder1.alias(builder1.literal(true), "vcol1"),
+builder1.alias(builder1.getRexBuilder.makeNullLiteral(boolType), 
"vcol2")))
+.build()
+
+val builder2 = call.builder
+val rightWithAddedVirtualCols = builder2
+.push(right)
+.project(
+  builder2.fields(fields) ++ Seq(
+builder2.alias(builder2.getRexBuilder.makeNullLiteral(boolType), 
"vcol1"),
+builder2.alias(builder2.literal(true), "vcol2")))
+.build()
+
+val builder = call.builder
+builder
+.push(leftWithAddedVirtualCols)
+.push(rightWithAddedVirtualCols)
+.union(true)
+.aggregate(
+  builder.groupKey(builder.fields(fields)),
+  builder.count(false, "vcol1_count", builder.field("vcol1")),
+  builder.count(false, "vcol2_count", builder.field("vcol2")))
+.filter(builder.and(
+  builder.call(
+GREATER_THAN_OR_EQUAL,
+builder.field("vcol1_count"),
+builder.literal(1)),
+  builder.call(
+GREATER_THAN_OR_EQUAL,
+builder.field("vcol2_count"),
+builder.literal(1
+.project(Seq(builder.call(
+  IF,
+  builder.call(GREATER_THAN, builder.field("vcol1_count"), 
builder.field("vcol2_count")),
+  builder.field("vcol2_count"),
+  builder.field("vcol1_count"))) ++ builder.fields(fields))
+
+call.transformTo(replicateRows(builder, intersect.getRowType, fields))
 
 Review comment:
   please define a variable for `replicateRows(builder, intersect.getRowType, 
fields)` result, which could make 

[GitHub] [flink] wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-26 Thread GitBox
wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix 
instability of BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#discussion_r297994922
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
 ##
 @@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+final class FileChannelMemoryMappedBoundedData implements BoundedData {
 
 Review comment:
   It is better to supplement some java doc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13011) Release the PyFlink into PyPI

2019-06-26 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873799#comment-16873799
 ] 

sunjincheng commented on FLINK-13011:
-

I appreciate it if you can give some suggestions about how to release the 
PyFlink into PyPI. :) [~Zentol] 

> Release the PyFlink into PyPI
> -
>
> Key: FLINK-13011
> URL: https://issues.apache.org/jira/browse/FLINK-13011
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Build System
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Priority: Major
>
> FLINK-12962 adds the ability to build a PyFlink distribution package, but we 
> have not yet released PyFlink to PyPI. The goal of JIRA is to publish the 
> PyFlinjk distribution package built by FLINK-12962 to PyPI.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on issue #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
sunjincheng121 commented on issue #8863: [FLINK-12962][python] Allows pyflink 
to be pip installed.
URL: https://github.com/apache/flink/pull/8863#issuecomment-506150106
 
 
   I have opened the JIRA(https://issues.apache.org/jira/browse/FLINK-13011) 
for Release the PyFlink into PyPI. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13011) Release the PyFlink into PyPI

2019-06-26 Thread sunjincheng (JIRA)
sunjincheng created FLINK-13011:
---

 Summary: Release the PyFlink into PyPI
 Key: FLINK-13011
 URL: https://issues.apache.org/jira/browse/FLINK-13011
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python, Build System
Affects Versions: 1.9.0
Reporter: sunjincheng


FLINK-12962 adds the ability to build a PyFlink distribution package, but we 
have not yet released PyFlink to PyPI. The goal of JIRA is to publish the 
PyFlinjk distribution package built by FLINK-12962 to PyPI.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8863: 
[FLINK-12962][python] Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297963075
 
 

 ##
 File path: flink-python/pyflink/version.py
 ##
 @@ -16,4 +16,4 @@
 # limitations under the License.
 

 
-__version__ = "0.1.0"
+__version__ = "1.9.dev0"
 
 Review comment:
   It's better to add explanations for the rule of version name. and add the 
link:
   https://www.python.org/dev/peps/pep-0440
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8863: 
[FLINK-12962][python] Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297984293
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/find-flink-home.sh
 ##
 @@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+CURRENT_DIR="$( cd "$(dirname "$0")" ; pwd -P )"
+FIND_FLINK_HOME_PYTHON_SCRIPT="$CURRENT_DIR/find_flink_home.py"
+
+if [ ! -f "$FIND_FLINK_HOME_PYTHON_SCRIPT" ]; then
+export FLINK_HOME="$( cd "$CURRENT_DIR"/.. ; pwd -P )"
+else
+PYFLINK_PYTHON="${PYFLINK_PYTHON:-"python"}"
+export FLINK_HOME=$($PYFLINK_PYTHON "$FIND_FLINK_HOME_PYTHON_SCRIPT")
 
 Review comment:
   remove `$PYFLINK_PYTHON`, and only using `$FIND_FLINK_HOME_PYTHON_SCRIPT`, 
due to we already add the `/usr/bin/env python` for `find_flink_home.py`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8863: 
[FLINK-12962][python] Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297983205
 
 

 ##
 File path: docs/flinkDev/building.md
 ##
 @@ -58,6 +58,17 @@ mvn clean install -DskipTests -Dfast
 
 The default build adds a Flink-specific JAR for Hadoop 2, to allow using Flink 
with HDFS and YARN.
 
+## Build PyFlink
+
+If you want to build a PyFlink package that can be used for pip installation, 
you need to build Flink jars first, as described in [Build Flink](##Build 
Flink).
+Then go to the root directory of flink source code and run this command to 
build a sdist package:
+
+{% highlight bash %}
+cd flink-python; python setup.py sdist
+{% endhighlight %}
+
+The sdist package will be found under `./flink-python/dist/`.
 
 Review comment:
   It's better to add the desc for how to install the `pyflink`, what do you 
think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8863: 
[FLINK-12962][python] Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297963181
 
 

 ##
 File path: flink-python/pyflink/shell.py
 ##
 @@ -131,8 +139,8 @@
 * t.select("a + 1, b, c").insert_into("stream_sink")
 *
 * st_env.exec_env().execute()
-  '''
-print(welcome_msg)
+'''
+utf8_out.write(welcome_msg)
 
 Review comment:
   Good catch!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.

2019-06-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8863: 
[FLINK-12962][python] Allows pyflink to be pip installed.
URL: https://github.com/apache/flink/pull/8863#discussion_r297971249
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/config.sh
 ##
 @@ -296,7 +296,10 @@ bin=`dirname "$target"`
 SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
 
 # Define the main directory of the flink installation
-FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
+# If config.sh is called by pyflink-shell.sh in python bin directory(pip 
installed), use the _PYFLINK_HOME
 
 Review comment:
   `use the _PYFLINK_HOME` -> `then do not need to set the FLINK_HOME here.` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-4399) Add support for oversized messages

2019-06-26 Thread Biao Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873791#comment-16873791
 ] 

Biao Liu commented on FLINK-4399:
-

Recently at-least two users met the oversized message issue. I'll try to 
resolve this. Since this general resolution is a bit complicated, I will attach 
a design doc to have a discussion first in next days.

> Add support for oversized messages
> --
>
> Key: FLINK-4399
> URL: https://issues.apache.org/jira/browse/FLINK-4399
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
>Assignee: Biao Liu
>Priority: Major
>  Labels: flip-6
>
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than the Framesize, as may happen for:
>   - {{collect()}} calls that collect large data sets (via accumulators)
>   - Job submissions and operator deployments where the functions closures are 
> large (for example because it contains large pre-loaded data)
>   - Function restore in cases where restored state is larger than 
> checkpointed state (union state)
> I suggest to use the {{BlobManager}} to transfer large payload.
>   - On the sender side, oversized messages are stored under a transient blob 
> (which is deleted after first retrieval, or after a certain number of minutes)
>   - The sender sends a "pointer to blob message" instead.
>   - The receiver grabs the message from the blob upon receiving the pointer 
> message
> The RPC Service should be optionally initializable with a "large message 
> handler" which is internally the {{BlobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13010) Refactor the process of SchedulerNG#requestPartitionState

2019-06-26 Thread zhijiang (JIRA)
zhijiang created FLINK-13010:


 Summary: Refactor the process of SchedulerNG#requestPartitionState
 Key: FLINK-13010
 URL: https://issues.apache.org/jira/browse/FLINK-13010
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently `requestPartitionState` is mainly used for querying partition state 
when the consumer receives `PartitionNotFoundException` during requesting 
partition. Actually we do not have the concept of partition state atm, and ` 
requestPartitionState` would return the corresponding producer's state as as 
result, so it exists a contradiction here.

My suggestion is refactoring the method as `requestPartitionProducerState` and 
we do not need to pass `IntermediateDataSetID` and `ResultPartitionID` 
arguments for finding the corresponding execution attempt. We could only pass 
the `ExecutionAttemptID` in method then the corresponding execution attempt 
could be easily found from the mapping in `ExecutionGraph`.

To do so, we could further remove ` IntermediateDataSetID` from 
`SingleInputGate` and might replace `IntermediateDataSetID` by `InputGateID` in 
`InputGateDeploymentDescriptor`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13009) YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots thro

2019-06-26 Thread Congxian Qiu(klion26) (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-13009:
--
Description: 
The test 
{{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}}
 throws NPE when Travis build.

>From the code of hadoop-2.8.3[1], seems the {{rmContext}} is null
{code:java}
// Only add in the running containers if this is the active attempt.
RMAppAttempt currentAttempt = rmContext.getRMApps()
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
{code}
 

log [https://api.travis-ci.org/v3/job/550689578/log.txt]

[1] 
https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128

  was:
The test 
{{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}}
 throws NPE when Travis build.

>From the code of hadoop-2.8.3, seems the {{rmContext}} is null
{code:java}
// code from 
https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128

// Only add in the running containers if this is the active attempt.
RMAppAttempt currentAttempt = rmContext.getRMApps()
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
{code}
 

log [https://api.travis-ci.org/v3/job/550689578/log.txt]


> YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots
>  throws NPE on Travis
> -
>
> Key: FLINK-13009
> URL: https://issues.apache.org/jira/browse/FLINK-13009
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> The test 
> {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}}
>  throws NPE when Travis build.
> From the code of hadoop-2.8.3[1], seems the {{rmContext}} is null
> {code:java}
> // Only add in the running containers if this is the active attempt.
>   RMAppAttempt currentAttempt = rmContext.getRMApps()
>   .get(attemptId.getApplicationId()).getCurrentAppAttempt();
> {code}
>  
> log [https://api.travis-ci.org/v3/job/550689578/log.txt]
> [1] 
> https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13009) YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots thro

2019-06-26 Thread Congxian Qiu(klion26) (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-13009:
--
Description: 
The test 
{{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}}
 throws NPE on Travis.

>From the code of hadoop-2.8.3[1], seems the {{rmContext}} is null
{code:java}
// Only add in the running containers if this is the active attempt.
RMAppAttempt currentAttempt = rmContext.getRMApps()
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
{code}
 

log [https://api.travis-ci.org/v3/job/550689578/log.txt]

[1] 
[https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128]

  was:
The test 
{{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}}
 throws NPE when Travis build.

>From the code of hadoop-2.8.3[1], seems the {{rmContext}} is null
{code:java}
// Only add in the running containers if this is the active attempt.
RMAppAttempt currentAttempt = rmContext.getRMApps()
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
{code}
 

log [https://api.travis-ci.org/v3/job/550689578/log.txt]

[1] 
https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128


> YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots
>  throws NPE on Travis
> -
>
> Key: FLINK-13009
> URL: https://issues.apache.org/jira/browse/FLINK-13009
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> The test 
> {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}}
>  throws NPE on Travis.
> From the code of hadoop-2.8.3[1], seems the {{rmContext}} is null
> {code:java}
> // Only add in the running containers if this is the active attempt.
>   RMAppAttempt currentAttempt = rmContext.getRMApps()
>   .get(attemptId.getApplicationId()).getCurrentAppAttempt();
> {code}
>  
> log [https://api.travis-ci.org/v3/job/550689578/log.txt]
> [1] 
> [https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-4399) Add support for oversized messages

2019-06-26 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu reassigned FLINK-4399:
---

Assignee: Biao Liu

> Add support for oversized messages
> --
>
> Key: FLINK-4399
> URL: https://issues.apache.org/jira/browse/FLINK-4399
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
>Assignee: Biao Liu
>Priority: Major
>  Labels: flip-6
>
> Currently, messages larger than the maximum Akka Framesize cause an error 
> when being transported. We should add a way to pass messages that are larger 
> than the Framesize, as may happen for:
>   - {{collect()}} calls that collect large data sets (via accumulators)
>   - Job submissions and operator deployments where the functions closures are 
> large (for example because it contains large pre-loaded data)
>   - Function restore in cases where restored state is larger than 
> checkpointed state (union state)
> I suggest to use the {{BlobManager}} to transfer large payload.
>   - On the sender side, oversized messages are stored under a transient blob 
> (which is deleted after first retrieval, or after a certain number of minutes)
>   - The sender sends a "pointer to blob message" instead.
>   - The receiver grabs the message from the blob upon receiving the pointer 
> message
> The RPC Service should be optionally initializable with a "large message 
> handler" which is internally the {{BlobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8906: [FLINK-13008]fix the findbugs warning in AggregationsFunctio.scala

2019-06-26 Thread GitBox
flinkbot commented on issue #8906: [FLINK-13008]fix the findbugs warning in 
AggregationsFunctio.scala
URL: https://github.com/apache/flink/pull/8906#issuecomment-506144046
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13008) fix the findbugs warning in AggregationsFunction

2019-06-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13008:
---
Labels: pull-request-available  (was: )

> fix the findbugs warning in AggregationsFunction
> 
>
> Key: FLINK-13008
> URL: https://issues.apache.org/jira/browse/FLINK-13008
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: wangpeibin
>Assignee: wangpeibin
>Priority: Minor
>  Labels: pull-request-available
>
> The goal is to fix the findbugs warning in 
> *[GeneratedAggregations|[https://github.com/apache/flink/blob/e5cadf69d176181af7d097928bf2a6cade1b6c76/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala]]*
>  
> with remove function in line 152, It return the 
> UnsupportedOperationException() rather than  throw it  which cause a scariest 
> findbugs  warning
>  
> {code:java}
> override def remove(): Unit = new java.lang.UnsupportedOperationException
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] klion26 commented on issue #8895: [FLINK-12536][Runtime/Network]Make BufferOrEventSequence#getNext non blocking

2019-06-26 Thread GitBox
klion26 commented on issue #8895: [FLINK-12536][Runtime/Network]Make 
BufferOrEventSequence#getNext non blocking
URL: https://github.com/apache/flink/pull/8895#issuecomment-506143844
 
 
   Travis build failed seems irrelevant, have filed a 
[issue](https://issues.apache.org/jira/browse/FLINK-13009) to track it.
   
   ```
   Test 
testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase)
 failed with:
   java.lang.NullPointerException: java.lang.NullPointerException
at 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics.getAggregateAppResourceUsage(RMAppAttemptMetrics.java:128)
at 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl.getApplicationResourceUsageReport(RMAppAttemptImpl.java:900)
at 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl.createAndGetApplicationReport(RMAppImpl.java:660)
at 
org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplications(ClientRMService.java:930)
at 
org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplications(ApplicationClientProtocolPBServiceImpl.java:273)
at 
org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:507)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangpeibin713 opened a new pull request #8906: [FLINK-13008]fix the findbugs warning in AggregationsFunctio.scala

2019-06-26 Thread GitBox
wangpeibin713 opened a new pull request #8906: [FLINK-13008]fix the findbugs 
warning in AggregationsFunctio.scala
URL: https://github.com/apache/flink/pull/8906
 
 
   
   
   ## What is the purpose of the change
   
   - The goal is to fix the findbugs warning in AggregationsFunctio.scala
   https://issues.apache.org/jira/browse/FLINK-13008
   
   ## Brief change log
   
 - *throw the unsupportedOperationException rather than return it*

   ## Verifying this change
   
   * This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13009) YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots thro

2019-06-26 Thread Congxian Qiu(klion26) (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-13009:
--
Summary: 
YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots
 throws NPE on Travis  (was: 
YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots)

> YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots
>  throws NPE on Travis
> -
>
> Key: FLINK-13009
> URL: https://issues.apache.org/jira/browse/FLINK-13009
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> The test 
> {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}}
>  throws NPE when Travis build.
> From the code of hadoop-2.8.3, seems the {{rmContext}} is null
> {code:java}
> // code from 
> https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128
> // Only add in the running containers if this is the active attempt.
>   RMAppAttempt currentAttempt = rmContext.getRMApps()
>   .get(attemptId.getApplicationId()).getCurrentAppAttempt();
> {code}
>  
> log [https://api.travis-ci.org/v3/job/550689578/log.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13009) YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots

2019-06-26 Thread Congxian Qiu(klion26) (JIRA)
Congxian Qiu(klion26) created FLINK-13009:
-

 Summary: 
YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots
 Key: FLINK-13009
 URL: https://issues.apache.org/jira/browse/FLINK-13009
 Project: Flink
  Issue Type: Test
  Components: Tests
Reporter: Congxian Qiu(klion26)


The test 
{{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}}
 throws NPE when Travis build.

>From the code of hadoop-2.8.3, seems the {{rmContext}} is null
{code:java}
// code from 
https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128

// Only add in the running containers if this is the active attempt.
RMAppAttempt currentAttempt = rmContext.getRMApps()
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
{code}
 

log [https://api.travis-ci.org/v3/job/550689578/log.txt]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on issue #8530: [FLINK-12611][table-planner-blink] Make time indicator nullable in blink

2019-06-26 Thread GitBox
wuchong commented on issue #8530: [FLINK-12611][table-planner-blink] Make time 
indicator nullable in blink
URL: https://github.com/apache/flink/pull/8530#issuecomment-506143355
 
 
   This looks good to me.
   +1 to merge. 
   
   I will rebase this and merge it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-26 Thread GitBox
wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix 
instability of BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#discussion_r297963987
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+
+/**
+ * Putting and getting of a sequence of buffers to/from a FileChannel or a 
ByteBuffer.
+ * This class handles the headers, length encoding, memory slicing.
+ *
+ * The encoding is the same across FileChannel and ByteBuffer, so this 
class can
+ * write to a file and read from the byte buffer that results from mapping 
this file to memory.
+ */
+final class BufferReaderWriterUtil {
+
+   static final int HEADER_LENGTH = 8;
+
+   static final int HEADER_VALUE_IS_BUFFER = 0;
+
+   static final int HEADER_VALUE_IS_EVENT = 1;
+
+   // 

+   //  ByteBuffer read / write
+   // 

+
+   static boolean writeBuffer(Buffer buffer, ByteBuffer memory) {
+   final int bufferSize = buffer.getSize();
+
+   if (memory.remaining() < bufferSize + HEADER_LENGTH) {
+   return false;
+   }
+
+   memory.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : 
HEADER_VALUE_IS_EVENT);
+   memory.putInt(bufferSize);
+   memory.put(buffer.getNioBufferReadable());
+   return true;
+   }
+
+   @Nullable
+   static Buffer sliceNextBuffer(ByteBuffer memory) {
+   final int remaining = memory.remaining();
+
+   // we only check the correct case where data is exhausted
+   // all other cases can only occur if our write logic is wrong 
and will already throw
+   // buffer underflow exceptions which will cause the read to 
fail.
+   if (remaining == 0) {
+   return null;
+   }
+
+   final int header = memory.getInt();
+   final int size = memory.getInt();
+
+   memory.limit(memory.position() + size);
+   ByteBuffer buf = memory.slice();
+   memory.position(memory.limit());
+   memory.limit(memory.capacity());
+
+   MemorySegment memorySegment = 
MemorySegmentFactory.wrapOffHeapMemory(buf);
+
+   return bufferFromMemorySegment(
+   memorySegment,
+   FreeingBufferRecycler.INSTANCE,
+   size,
+   header == HEADER_VALUE_IS_EVENT);
+   }
+
+   // 

+   //  ByteChannel read / write
+   // 

+
+   static long writeToByteChannel(
+   FileChannel channel,
+   Buffer buffer,
+   ByteBuffer[] arrayWithHeaderBuffer) throws IOException {
+
+   final ByteBuffer headerBuffer = arrayWithHeaderBuffer[0];
+   headerBuffer.clear();
+   headerBuffer.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER 
: HEADER_VALUE_IS_EVENT);
+   headerBuffer.putInt(buffer.getSize());
+   headerBuffer.flip();

[jira] [Commented] (FLINK-12848) Method equals() in RowTypeInfo should consider fieldsNames

2019-06-26 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873786#comment-16873786
 ] 

Jark Wu commented on FLINK-12848:
-

Hi [~aloyszhang] [~yanghua] [~enricoc], thanks for reporting this. 

However, I don't think it's a safe way to compare field names in 
RowTypeInfo#equals. As [~enricoc] mentioned, we explicitly explained 
RowTypeInfo#equals should not check field names in FLINK-9444. Because 
RowTypeInfo is an information describes the structure of 
{{org.apache.flink.types.Row}}, there is no field names in Row. And in runtime 
(e.g. serialize/deserialize), the field names also don't matter.

We heavily used TypeInformation (including #equals) in SQL before. This led to 
a lot of problems. That's why [~tiwalter] introduced a well-defined type system 
in SQL recently. This work is still ongoing, and we will replace the 
TypeInformation with the new type system totally in SQL/Table. At that time, 
the problems you encountered should be fixed.

[~aloyszhang] could you attach the implementation of 
{{SimpleProcessionTimeSource}}, without it I can't reproduce the exception you 
mentioned. We can try it again to see whether it has been solved.


> Method equals() in RowTypeInfo should consider fieldsNames
> --
>
> Key: FLINK-12848
> URL: https://issues.apache.org/jira/browse/FLINK-12848
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.7.2
>Reporter: aloyszhang
>Assignee: aloyszhang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Since the `RowTypeInfo#equals()` does not consider the fieldNames , when 
> process data with RowTypeInfo type there may comes an error of the field 
> name.  
> {code:java}
> String [] fields = new String []{"first", "second"};
> TypeInformation[] types = new TypeInformation[]{
> Types.ROW_NAMED(new String[]{"first001"}, Types.INT),
> Types.ROW_NAMED(new String[]{"second002"}, Types.INT) }; 
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment env = 
> StreamTableEnvironment.getTableEnvironment(execEnv);
> SimpleProcessionTimeSource streamTableSource = new 
> SimpleProcessionTimeSource(fields, types);
> env.registerTableSource("testSource", streamTableSource);
> Table sourceTable = env.scan("testSource");
> System.out.println("Source table schema : ");
> sourceTable.printSchema();
> {code}
> The table shcema will be 
> {code:java}
> Source table schema : 
> root 
> |-- first: Row(first001: Integer) 
> |-- second: Row(first001: Integer) 
> |-- timestamp: TimeIndicatorTypeInfo(proctime)
> {code}
> the second field has the same name with the first field.
> So, we should consider the fieldnames in RowTypeInfo#equals()
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-26 Thread GitBox
klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support 
TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#issuecomment-506141650
 
 
   Hi, @yumengz5  what do you mean "task will block all the time?", would you 
mind explain it more?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aloyszhang commented on a change in pull request #8902: [hotfix][docs] fix wrong link in streamfile_sink

2019-06-26 Thread GitBox
aloyszhang commented on a change in pull request #8902: [hotfix][docs] fix 
wrong link in streamfile_sink
URL: https://github.com/apache/flink/pull/8902#discussion_r297962271
 
 

 ##
 File path: docs/dev/connectors/streamfile_sink.md
 ##
 @@ -24,7 +24,7 @@ under the License.
 -->
 
 This connector provides a Sink that writes partitioned files to filesystems
-supported by the [Flink `FileSystem` abstraction]({{ 
site.baseurl}}/ops/filesystems.html).
+supported by the [Flink `FileSystem` abstraction]({{ 
site.baseurl}}/ops/filesystems).
 
 Review comment:
   Hi zentol,
   I have been working on translating this doc to Chinese. So, I will fix the 
chinese version in[ 
FLINK-12944](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-12944?filter=allissues)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] aloyszhang commented on a change in pull request #8902: [hotfix][docs] fix wrong link in streamfile_sink

2019-06-26 Thread GitBox
aloyszhang commented on a change in pull request #8902: [hotfix][docs] fix 
wrong link in streamfile_sink
URL: https://github.com/apache/flink/pull/8902#discussion_r297962271
 
 

 ##
 File path: docs/dev/connectors/streamfile_sink.md
 ##
 @@ -24,7 +24,7 @@ under the License.
 -->
 
 This connector provides a Sink that writes partitioned files to filesystems
-supported by the [Flink `FileSystem` abstraction]({{ 
site.baseurl}}/ops/filesystems.html).
+supported by the [Flink `FileSystem` abstraction]({{ 
site.baseurl}}/ops/filesystems).
 
 Review comment:
   Hi @zentol,
   I have been working on translating this doc to Chinese. So, I will fix the 
chinese version in[ 
FLINK-12944](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-12944?filter=allissues)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13008) fix the findbugs warning in AggregationsFunction

2019-06-26 Thread wangpeibin (JIRA)
wangpeibin created FLINK-13008:
--

 Summary: fix the findbugs warning in AggregationsFunction
 Key: FLINK-13008
 URL: https://issues.apache.org/jira/browse/FLINK-13008
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Reporter: wangpeibin
Assignee: wangpeibin


The goal is to fix the findbugs warning in 
*[GeneratedAggregations|[https://github.com/apache/flink/blob/e5cadf69d176181af7d097928bf2a6cade1b6c76/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala]]*
 

with remove function in line 152, It return the UnsupportedOperationException() 
rather than  throw it  which cause a scariest findbugs  warning

 
{code:java}
override def remove(): Unit = new java.lang.UnsupportedOperationException
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] yumengz5 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-26 Thread GitBox
yumengz5 commented on issue #8617: [FLINK-12619][StateBackend]Support 
TERMINATE/SUSPEND Job with Checkpoint
URL: https://github.com/apache/flink/pull/8617#issuecomment-506135399
 
 
   What if the checkpoint is timeout, the task will block all the time?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12609) Align the Python data types with Java

2019-06-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12609:
---
Labels: pull-request-available  (was: )

> Align the Python data types with Java
> -
>
> Key: FLINK-12609
> URL: https://issues.apache.org/jira/browse/FLINK-12609
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> As discussed in 
> https://github.com/apache/flink/pull/8420#issuecomment-495444623, currently, 
> there are some data types defined in Java not supported in Python such as 
> TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, etc. We should 
> support them in Python once these types have been fully supported in Java.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8847: [FLINK-12609][python] Align the Python data types with Java

2019-06-26 Thread GitBox
asfgit closed pull request #8847: [FLINK-12609][python] Align the Python data 
types with Java
URL: https://github.com/apache/flink/pull/8847
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12609) Align the Python data types with Java

2019-06-26 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12609.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in master: 0ed394ef0908a88502d198a90f49e04b123a4eda

> Align the Python data types with Java
> -
>
> Key: FLINK-12609
> URL: https://issues.apache.org/jira/browse/FLINK-12609
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.9.0
>
>
> As discussed in 
> https://github.com/apache/flink/pull/8420#issuecomment-495444623, currently, 
> there are some data types defined in Java not supported in Python such as 
> TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, etc. We should 
> support them in Python once these types have been fully supported in Java.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13007) Integrate Flink's FunctionCatalog with Catalog APIs

2019-06-26 Thread Bowen Li (JIRA)
Bowen Li created FLINK-13007:


 Summary: Integrate Flink's FunctionCatalog with Catalog APIs
 Key: FLINK-13007
 URL: https://issues.apache.org/jira/browse/FLINK-13007
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API, Table SQL / Planner
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0


Integrate Flink's FunctionCatalog with Catalog APIs so that user defined 
functions can be persist thru catalog APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12999) Can't generate valid execution plan for "SELECT uuid() FROM VALUES(1,2,3) T(a, b, c)"

2019-06-26 Thread Zhenghua Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-12999:
-
Summary: Can't generate valid execution plan for "SELECT uuid() FROM 
VALUES(1,2,3) T(a, b, c)"  (was: Can't generate valid execution plan for 
"SELECT uuid() FROM VALUES(1) T(a)")

> Can't generate valid execution plan for "SELECT uuid() FROM VALUES(1,2,3) 
> T(a, b, c)"
> -
>
> Key: FLINK-12999
> URL: https://issues.apache.org/jira/browse/FLINK-12999
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Zhenghua Gao
>Assignee: godfrey he
>Priority: Major
>
> The ERROR message is: 
>  
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query:
> LogicalSink(fields=[EXPR$0])
> +- LogicalProject(EXPR$0=[UUID()])
>  +- LogicalValues(tuples=[[\{ 1, 2, 3 }]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> at 
> org.apache.flink.table.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72)
>  at 
> org.apache.flink.table.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>  at 
> org.apache.flink.table.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>  at 
> org.apache.flink.table.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>  at 
> org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:82)
>  at 
> org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:51)
>  at 
> org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:39)
>  at 
> org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:39)
>  at scala.collection.immutable.List.foreach(List.scala:392)
>  at 
> org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:39)
>  at 
> org.apache.flink.table.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:65)
>  at 
> org.apache.flink.table.api.TableEnvironment.optimize(TableEnvironment.scala:251)
>  at 
> org.apache.flink.table.api.TableEnvironment.compileToExecNodePlan(TableEnvironment.scala:200)
>  at 
> org.apache.flink.table.api.TableEnvironment.compile(TableEnvironment.scala:184)
>  at 
> org.apache.flink.table.api.TableEnvironment.generateStreamGraph(TableEnvironment.scala:155)
>  at 
> org.apache.flink.table.api.BatchTableEnvironment.execute(BatchTableEnvironment.scala:93)
>  at 
> org.apache.flink.table.api.TableEnvironment.execute(TableEnvironment.scala:136)
>  at 
> org.apache.flink.table.runtime.utils.BatchTableEnvUtil$.collect(BatchTableEnvUtil.scala:55)
>  at 
> org.apache.flink.table.runtime.utils.TableUtil$.collectSink(TableUtil.scala:60)
>  at 
> org.apache.flink.table.runtime.utils.TableUtil$.collect(TableUtil.scala:41)
>  at 
> org.apache.flink.table.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:308)
>  at 
> org.apache.flink.table.runtime.utils.BatchTestBase.check(BatchTestBase.scala:164)
>  at 
> org.apache.flink.table.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:103)
>  at 
> org.apache.flink.table.runtime.batch.sql.ValuesITCase.test(ValuesITCase.scala:38)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> 

[GitHub] [flink] flinkbot edited a comment on issue #8847: [FLINK-12609][python] Align the Python data types with Java

2019-06-26 Thread GitBox
flinkbot edited a comment on issue #8847: [FLINK-12609][python] Align the 
Python data types with Java
URL: https://github.com/apache/flink/pull/8847#issuecomment-504894006
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @sunjincheng121 [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8847: [FLINK-12609][python] Align the Python data types with Java

2019-06-26 Thread GitBox
sunjincheng121 commented on a change in pull request #8847: 
[FLINK-12609][python] Align the Python data types with Java
URL: https://github.com/apache/flink/pull/8847#discussion_r297952647
 
 

 ##
 File path: flink-python/pyflink/table/table_environment.py
 ##
 @@ -494,9 +494,11 @@ def verify_obj(obj):
 raise TypeError(
 "schema should be RowType, list, tuple or None, but got: %s" % 
schema)
 
+# verifies the elements against the specified schema
+elements = map(verify_obj, elements)
 # converts python data to sql data
 elements = [schema.to_sql_type(element) for element in elements]
-return self._from_elements(map(verify_obj, elements), schema)
+return self._from_elements(elements, schema)
 
 Review comment:
   Good catch!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8905: [FLINK-13006][hive] remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it

2019-06-26 Thread GitBox
flinkbot commented on issue #8905: [FLINK-13006][hive] remove 
GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it
URL: https://github.com/apache/flink/pull/8905#issuecomment-506129625
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 opened a new pull request #8905: [FLINK-13006][hive] remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it

2019-06-26 Thread GitBox
bowenli86 opened a new pull request #8905: [FLINK-13006][hive] remove 
GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it
URL: https://github.com/apache/flink/pull/8905
 
 
   ## What is the purpose of the change
   
   This PR removes GenericUDTFReplicateRows from HiveGenericUDTFTest because 
Hive 1.2.1 doesn't have it and thus the build profile for Hive 1.2.1 would fail.
   
   I've added pretty a lot test coverage for HiveGenericUDTF, so removing just 
this one test should be fine.
   
   ## Brief change log
   
   - remove GenericUDTFReplicateRows from GenericUDTFTest
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service

2019-06-26 Thread GitBox
bowenli86 commented on issue #8889: [FLINK-12934][hive] add additional 
dependencies for flink-connector-hive to connect to remote hive metastore 
service
URL: https://github.com/apache/flink/pull/8889#issuecomment-506129381
 
 
   cc @xuefuz @lirui-apache @zjuwangg 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8905: [FLINK-13006][hive] remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it

2019-06-26 Thread GitBox
bowenli86 commented on issue #8905: [FLINK-13006][hive] remove 
GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it
URL: https://github.com/apache/flink/pull/8905#issuecomment-506129301
 
 
   cc @xuefuz @lirui-apache @zjuwangg


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13006) remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it

2019-06-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13006:
---
Labels: pull-request-available  (was: )

> remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 
> doesn't have it
> ---
>
> Key: FLINK-13006
> URL: https://issues.apache.org/jira/browse/FLINK-13006
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot edited a comment on issue #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter

2019-06-26 Thread GitBox
flinkbot edited a comment on issue #8894:  [FLINK-12961][datastream] Providing 
an internal execution method of StreamExecutionEnvironment accepting 
StreamGraph as input parameter
URL: https://github.com/apache/flink/pull/8894#issuecomment-505774524
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❗ 3. Needs [attention] from.
   - Needs attention by @aljoscha [PMC]
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12953) View logs from Job view in Web Dashboard

2019-06-26 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873759#comment-16873759
 ] 

vinoyang commented on FLINK-12953:
--

Hi [~rmetzger] Agree. In our streaming platform, we have strong requirements 
about listing old log files for viewing. You know, for long-running streaming 
applications, we usually configure rolling file strategy based on log 
frameworks(log4j or logback) otherwise, the size of a single log file would be 
very large. Currently, Flink only allows users to view the latest log files, 
it's not easy for users to location some problems which have a relationship 
with old-time.

> View logs from Job view in Web Dashboard
> 
>
> Key: FLINK-12953
> URL: https://issues.apache.org/jira/browse/FLINK-12953
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Chad Dombrova
>Priority: Major
>
> As a (beam) developer I want to be able to print/log information from my 
> custom transforms, and then monitor that output within the job view of the 
> Web Dashboard, so that I don't have to go hunting through the combined log in 
> the Job Manager view.   The Job Manager log has way too much in it, spanning 
> all jobs, including output logged by both flink and user code.
> A good example of how this UX should work can be found in Google Dataflow:  
> - click on a job, and see the logged output for that job
> - click on a transform, and see the logged output for just that transform
> thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] ifndef-SleePy commented on issue #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter

2019-06-26 Thread GitBox
ifndef-SleePy commented on issue #8894:  [FLINK-12961][datastream] Providing an 
internal execution method of StreamExecutionEnvironment accepting StreamGraph 
as input parameter
URL: https://github.com/apache/flink/pull/8894#issuecomment-506124650
 
 
   @flinkbot attention @aljoscha 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13006) remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it

2019-06-26 Thread Bowen Li (JIRA)
Bowen Li created FLINK-13006:


 Summary: remove GenericUDTFReplicateRows from GenericUDTFTest 
because Hive 1.2.1 doesn't have it
 Key: FLINK-13006
 URL: https://issues.apache.org/jira/browse/FLINK-13006
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13004) Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState

2019-06-26 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873756#comment-16873756
 ] 

Jark Wu commented on FLINK-13004:
-

I think the {{timestamp}} parameter should be primitive long. Thanks for 
reporting this.

Could your elaborate more about why larger than 127L, the result would return 
false?

> Correct the logic of needToCleanupState in 
> KeyedProcessFunctionWithCleanupState
> ---
>
> Key: FLINK-13004
> URL: https://issues.apache.org/jira/browse/FLINK-13004
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>
> Current implementation of needToCleanupState in 
> KeyedProcessFunctionWithCleanupState actually has potention bug:
> {code:java}
> protected Boolean needToCleanupState(Long timestamp) throws IOException {
>if (stateCleaningEnabled) {
>   Long cleanupTime = cleanupTimeState.value();
>   // check that the triggered timer is the last registered processing 
> time timer.
>   return null != cleanupTime && timestamp == cleanupTime;
>} else {
>   return false;
>}
> }
> {code}
> Please note that it directly use "==" to judge whether *Long* type timestamp 
> and cleanupTime equals. However, if that value is larger than 127L, the 
> result would actually return false instead of wanted true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12771) Support ConnectorCatalogTable in HiveCatalog

2019-06-26 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873755#comment-16873755
 ] 

Bowen Li commented on FLINK-12771:
--

Temporarily on hold this ticket and remove it from 1.9.0

> Support ConnectorCatalogTable in HiveCatalog
> 
>
> Key: FLINK-12771
> URL: https://issues.apache.org/jira/browse/FLINK-12771
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently {{HiveCatalog}} does not support {{ConnectorCatalogTable}}. There's 
> a major drawback on this when it comes to real use cases, that is when Table 
> API users set a {{HiveCatalog}} as their default catalog (which is very 
> likely), they cannot create or use any inline table sources/sinks with their 
> default catalog any more. It's really inconvenient for Table API users to use 
> Flink for exploration, experiment, and production.
> There are several workaround in this case. E.g. users have to switch their 
> default catalog, but that misses our original intention of having a default 
> {{HiveCatalog}}; or users can register their inline source/sinks to Flink's 
> default catalog which is a in memory catalog, but that not only require users 
> to type full path of a table but also requires users to be aware of the 
> Flink's default catalog, default db, and their names. In short, none of the 
> workaround seems to be reasonable and user friendly.
> From another perspective, Hive has the concept of temporary tables that are 
> stored in memory of Hive metastore client and are removed when client is shut 
> down. In Flink, {{ConnectorCatalogTable}} can be seen as a type of 
> session-based temporary table, and {{HiveCatalog}} (potentially any catalog 
> implementations) can store it in memory. By introducing the concept of temp 
> table, we could greatly eliminate frictions for users and raise their 
> experience and productivity.
> Thus, we propose adding a simple in memory map for {{ConnectorCatalogTable}} 
> in {{HiveCatalog}} to allow users create and use inline source/sink when 
> their default catalog is a {{HiveCatalog}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12771) Support ConnectorCatalogTable in HiveCatalog

2019-06-26 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12771:
-
Fix Version/s: (was: 1.9.0)

> Support ConnectorCatalogTable in HiveCatalog
> 
>
> Key: FLINK-12771
> URL: https://issues.apache.org/jira/browse/FLINK-12771
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently {{HiveCatalog}} does not support {{ConnectorCatalogTable}}. There's 
> a major drawback on this when it comes to real use cases, that is when Table 
> API users set a {{HiveCatalog}} as their default catalog (which is very 
> likely), they cannot create or use any inline table sources/sinks with their 
> default catalog any more. It's really inconvenient for Table API users to use 
> Flink for exploration, experiment, and production.
> There are several workaround in this case. E.g. users have to switch their 
> default catalog, but that misses our original intention of having a default 
> {{HiveCatalog}}; or users can register their inline source/sinks to Flink's 
> default catalog which is a in memory catalog, but that not only require users 
> to type full path of a table but also requires users to be aware of the 
> Flink's default catalog, default db, and their names. In short, none of the 
> workaround seems to be reasonable and user friendly.
> From another perspective, Hive has the concept of temporary tables that are 
> stored in memory of Hive metastore client and are removed when client is shut 
> down. In Flink, {{ConnectorCatalogTable}} can be seen as a type of 
> session-based temporary table, and {{HiveCatalog}} (potentially any catalog 
> implementations) can store it in memory. By introducing the concept of temp 
> table, we could greatly eliminate frictions for users and raise their 
> experience and productivity.
> Thus, we propose adding a simple in memory map for {{ConnectorCatalogTable}} 
> in {{HiveCatalog}} to allow users create and use inline source/sink when 
> their default catalog is a {{HiveCatalog}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-13004) Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState

2019-06-26 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-13004:

Component/s: Table SQL / Planner

> Correct the logic of needToCleanupState in 
> KeyedProcessFunctionWithCleanupState
> ---
>
> Key: FLINK-13004
> URL: https://issues.apache.org/jira/browse/FLINK-13004
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>
> Current implementation of needToCleanupState in 
> KeyedProcessFunctionWithCleanupState actually has potention bug:
> {code:java}
> protected Boolean needToCleanupState(Long timestamp) throws IOException {
>if (stateCleaningEnabled) {
>   Long cleanupTime = cleanupTimeState.value();
>   // check that the triggered timer is the last registered processing 
> time timer.
>   return null != cleanupTime && timestamp == cleanupTime;
>} else {
>   return false;
>}
> }
> {code}
> Please note that it directly use "==" to judge whether *Long* type timestamp 
> and cleanupTime equals. However, if that value is larger than 127L, the 
> result would actually return false instead of wanted true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   5   6   >