[GitHub] [flink] yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
yumengz5 edited a comment on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#issuecomment-506202631 In my opinion, If the checkpoint is timeout, this checkpoint will be discarded. But the checkpoint coordinator would handle such abortion as usual and assume that the Flink job continues running. In fact, it looks like "running", but it doesn't process any records. In sync-checkpoint case, every task will be blocked after triggering the checkpoint. Unlocking only happens either when the task is cancelled or when the corresponding checkpoint is acknowledged. I guess stop-with-savepoint may have the same issue. I'm not sure if it will truly happen. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yumengz5 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
yumengz5 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#issuecomment-506202631 In my opinion, If the checkpoint is timeout, this checkpoint will be discarded. But the checkpoint coordinator would handle such abortion as usual and assume that the Flink job continues running. In fact, it looks like "running", but it doesn't process any records. In sync-checkpoint case, every task will be blocked after triggering the checkpoint. Unlocking only happens either when the task is cancelled or when the corresponding checkpoint is acknowledged. I guess stop-with-savepoint may have the same issue. I'm not sure it will truly happen. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#discussion_r298015077 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.util.IOUtils; + +import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkState; + +final class FileChannelMemoryMappedBoundedData implements BoundedData { + + /** The file channel backing the memory mapped file. */ + private final FileChannel fileChannel; + + /** The reusable array with header buffer and data buffer, to use gathering writes on the +* file channel ({@link java.nio.channels.GatheringByteChannel#write(ByteBuffer[])}). */ + private final ByteBuffer[] headerAndBufferArray; + + /** All memory mapped regions. */ + private final ArrayList memoryMappedRegions; + + /** The path of the memory mapped file. */ + private final Path filePath; + + /** The position in the file channel. Cached for efficiency, because an actual position +* lookup in the channel involves various locks and checks. */ + private long pos; + + /** The position where the current memory mapped region must end. */ + private long endOfCurrentRegion; + + /** The position where the current memory mapped started. */ + private long startOfCurrentRegion; + + /** The maximum size of each mapped region. */ + private final long maxRegionSize; + + FileChannelMemoryMappedBoundedData( + Path filePath, + FileChannel fileChannel, + int maxSizePerMappedRegion) throws IOException { + + this.filePath = filePath; + this.fileChannel = fileChannel; + this.headerAndBufferArray = BufferReaderWriterUtil.allocatedWriteBufferArray(); + this.memoryMappedRegions = new ArrayList<>(4); + this.maxRegionSize = maxSizePerMappedRegion; + this.endOfCurrentRegion = maxSizePerMappedRegion; + } + + @Override + public void writeBuffer(Buffer buffer) throws IOException { + if (tryWriteBuffer(buffer)) { + return; + } + + mapRegionAndStartNext(); + + if (!tryWriteBuffer(buffer)) { + throwTooLargeBuffer(buffer); + } + } + + private boolean tryWriteBuffer(Buffer buffer) throws IOException { + final long spaceLeft = endOfCurrentRegion - pos; + final long bytesWritten = BufferReaderWriterUtil.writeToByteChannelIfBelowSize( + fileChannel, buffer, headerAndBufferArray, spaceLeft); + + if (bytesWritten >= 0) { + pos += bytesWritten; + return true; + } + else { + return false; + } + } + + @Override + public BoundedData.Reader createReader() { + checkState(!fileChannel.isOpen()); + + final List buffers = memoryMappedRegions.stream() + .map((bb) -> bb.duplicate().order(ByteOrder.nativeOrder())) + .collect(Collectors.toList()); + + return new MemoryMappedBoundedData.BufferSlicer(buffers); + } + + /**
[GitHub] [flink] wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#discussion_r298015030 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.util.IOUtils; + +import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkState; + +final class FileChannelMemoryMappedBoundedData implements BoundedData { + + /** The file channel backing the memory mapped file. */ + private final FileChannel fileChannel; + + /** The reusable array with header buffer and data buffer, to use gathering writes on the +* file channel ({@link java.nio.channels.GatheringByteChannel#write(ByteBuffer[])}). */ + private final ByteBuffer[] headerAndBufferArray; + + /** All memory mapped regions. */ + private final ArrayList memoryMappedRegions; + + /** The path of the memory mapped file. */ + private final Path filePath; + + /** The position in the file channel. Cached for efficiency, because an actual position +* lookup in the channel involves various locks and checks. */ + private long pos; + + /** The position where the current memory mapped region must end. */ + private long endOfCurrentRegion; + + /** The position where the current memory mapped started. */ + private long startOfCurrentRegion; + + /** The maximum size of each mapped region. */ + private final long maxRegionSize; + + FileChannelMemoryMappedBoundedData( + Path filePath, + FileChannel fileChannel, + int maxSizePerMappedRegion) throws IOException { + + this.filePath = filePath; + this.fileChannel = fileChannel; + this.headerAndBufferArray = BufferReaderWriterUtil.allocatedWriteBufferArray(); + this.memoryMappedRegions = new ArrayList<>(4); + this.maxRegionSize = maxSizePerMappedRegion; + this.endOfCurrentRegion = maxSizePerMappedRegion; + } + + @Override + public void writeBuffer(Buffer buffer) throws IOException { + if (tryWriteBuffer(buffer)) { + return; + } + + mapRegionAndStartNext(); + + if (!tryWriteBuffer(buffer)) { + throwTooLargeBuffer(buffer); + } + } + + private boolean tryWriteBuffer(Buffer buffer) throws IOException { + final long spaceLeft = endOfCurrentRegion - pos; + final long bytesWritten = BufferReaderWriterUtil.writeToByteChannelIfBelowSize( + fileChannel, buffer, headerAndBufferArray, spaceLeft); + + if (bytesWritten >= 0) { + pos += bytesWritten; + return true; + } + else { + return false; + } + } + + @Override + public BoundedData.Reader createReader() { + checkState(!fileChannel.isOpen()); + + final List buffers = memoryMappedRegions.stream() + .map((bb) -> bb.duplicate().order(ByteOrder.nativeOrder())) + .collect(Collectors.toList()); + + return new MemoryMappedBoundedData.BufferSlicer(buffers); + } + + /**
[jira] [Updated] (FLINK-11388) Add an OSS RecoverableWriter
[ https://issues.apache.org/jira/browse/FLINK-11388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11388: --- Labels: pull-request-available (was: ) > Add an OSS RecoverableWriter > > > Key: FLINK-11388 > URL: https://issues.apache.org/jira/browse/FLINK-11388 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem >Affects Versions: 1.7.1 >Reporter: wujinhu >Assignee: wujinhu >Priority: Major > Labels: pull-request-available > > OSS offers persistence only after uploads or multi-part uploads complete. In > order to make streaming uses OSS as sink, we should implement a Recoverable > writer. This writer will snapshot and store multi-part upload information and > recover from those information when failure occurs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wujinhu closed pull request #7798: [FLINK-11388][fs] Add Aliyun OSS recoverable writer
wujinhu closed pull request #7798: [FLINK-11388][fs] Add Aliyun OSS recoverable writer URL: https://github.com/apache/flink/pull/7798 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r298012822 ## File path: docs/flinkDev/building.md ## @@ -58,6 +58,17 @@ mvn clean install -DskipTests -Dfast The default build adds a Flink-specific JAR for Hadoop 2, to allow using Flink with HDFS and YARN. +## Build PyFlink + +If you want to build a PyFlink package that can be used for pip installation, you need to build Flink jars first, as described in [Build Flink](##Build Flink). +Then go to the root directory of flink source code and run this command to build a sdist package: + +{% highlight bash %} +cd flink-python; python setup.py sdist Review comment: We also need `.whl`, right? `python setup.py sdist bdist_wheel`? detail can be found here: https://packaging.python.org/tutorials/packaging-projects/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect all / minus all to blink planner
JingsongLi commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect all / minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r298013117 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala ## @@ -55,4 +71,47 @@ abstract class ReplaceSetOpWithJoinRuleBase[T <: SetOp]( } conditions } + + protected def replicateRows( Review comment: I think `ReplaceSetOpWithJoinRuleBase` is useless, `generateCondition` is a static method. There could be a `SetOpRewriteUtil`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r298012822 ## File path: docs/flinkDev/building.md ## @@ -58,6 +58,17 @@ mvn clean install -DskipTests -Dfast The default build adds a Flink-specific JAR for Hadoop 2, to allow using Flink with HDFS and YARN. +## Build PyFlink + +If you want to build a PyFlink package that can be used for pip installation, you need to build Flink jars first, as described in [Build Flink](##Build Flink). +Then go to the root directory of flink source code and run this command to build a sdist package: + +{% highlight bash %} +cd flink-python; python setup.py sdist Review comment: We also need `.whl`, right? `python setup.py sdist bdist_wheel`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r298012822 ## File path: docs/flinkDev/building.md ## @@ -58,6 +58,17 @@ mvn clean install -DskipTests -Dfast The default build adds a Flink-specific JAR for Hadoop 2, to allow using Flink with HDFS and YARN. +## Build PyFlink + +If you want to build a PyFlink package that can be used for pip installation, you need to build Flink jars first, as described in [Build Flink](##Build Flink). +Then go to the root directory of flink source code and run this command to build a sdist package: + +{% highlight bash %} +cd flink-python; python setup.py sdist Review comment: We also need `.whl`, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12751) Create file based HA support
[ https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873830#comment-16873830 ] Yang Wang commented on FLINK-12751: --- [~borisl] Sorry for the late. Just as you say, job manager selection is done by k8s deployment of 1. However, if the kubelet crashed, the pod running on it may not be terminated. So two job managers may be running at the same time. We need to use the etcd/zookeeper to do the job manager selection. > Create file based HA support > > > Key: FLINK-12751 > URL: https://issues.apache.org/jira/browse/FLINK-12751 > Project: Flink > Issue Type: Improvement > Components: FileSystems >Affects Versions: 1.8.0, 1.9.0, 2.0.0 > Environment: Flink on k8 and Mini cluster >Reporter: Boris Lublinsky >Priority: Major > Labels: features, pull-request-available > Original Estimate: 168h > Time Spent: 10m > Remaining Estimate: 167h 50m > > In the current Flink implementation, HA support can be implemented either > using Zookeeper or Custom Factory class. > Add HA implementation based on PVC. The idea behind this implementation > is as follows: > * Because implementation assumes a single instance of Job manager (Job > manager selection and restarts are done by K8 Deployment of 1) > URL management is done using StandaloneHaServices implementation (in the case > of cluster) and EmbeddedHaServices implementation (in the case of mini > cluster) > * For management of the submitted Job Graphs, checkpoint counter and > completed checkpoint an implementation is leveraging the following file > system layout > {code} > ha -> root of the HA data > checkpointcounter -> checkpoint counter folder > -> job id folder > -> counter file > -> another job id folder > ... > completedCheckpoint -> completed checkpoint folder > -> job id folder > -> checkpoint file > -> checkpoint file > ... > -> another job id folder > ... > submittedJobGraph -> submitted graph folder > -> job id folder > -> graph file > -> another job id folder > ... > {code} > An implementation should overwrites 2 of the Flink files: > * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA > service > * HighAvailabilityMode - added `FILESYSTEM` to available HA options. > The actual implementation adds the following classes: > * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` > for file system > * `FileSystemUtils` - support class for creation of runtime components. > * `FileSystemStorageHelper` - file system operations implementation for > filesystem based HA > * `FileSystemCheckpointRecoveryFactory` - an implementation of a > `CheckpointRecoveryFactory`for file system > * `FileSystemCheckpointIDCounter` - an implementation of a > `CheckpointIDCounter` for file system > * `FileSystemCompletedCheckpointStore` - an implementation of a > `CompletedCheckpointStore` for file system > * `FileSystemSubmittedJobGraphStore` - an implementation of a > `SubmittedJobGraphStore` for file system -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r298012822 ## File path: docs/flinkDev/building.md ## @@ -58,6 +58,17 @@ mvn clean install -DskipTests -Dfast The default build adds a Flink-specific JAR for Hadoop 2, to allow using Flink with HDFS and YARN. +## Build PyFlink + +If you want to build a PyFlink package that can be used for pip installation, you need to build Flink jars first, as described in [Build Flink](##Build Flink). +Then go to the root directory of flink source code and run this command to build a sdist package: + +{% highlight bash %} +cd flink-python; python setup.py sdist Review comment: We alos need `.whl`, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#discussion_r298012847 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.util.IOUtils; + +import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkState; + +final class FileChannelMemoryMappedBoundedData implements BoundedData { + + /** The file channel backing the memory mapped file. */ + private final FileChannel fileChannel; + + /** The reusable array with header buffer and data buffer, to use gathering writes on the +* file channel ({@link java.nio.channels.GatheringByteChannel#write(ByteBuffer[])}). */ + private final ByteBuffer[] headerAndBufferArray; + + /** All memory mapped regions. */ + private final ArrayList memoryMappedRegions; + + /** The path of the memory mapped file. */ + private final Path filePath; + + /** The position in the file channel. Cached for efficiency, because an actual position +* lookup in the channel involves various locks and checks. */ + private long pos; + + /** The position where the current memory mapped region must end. */ + private long endOfCurrentRegion; + + /** The position where the current memory mapped started. */ + private long startOfCurrentRegion; + + /** The maximum size of each mapped region. */ + private final long maxRegionSize; + + FileChannelMemoryMappedBoundedData( + Path filePath, + FileChannel fileChannel, + int maxSizePerMappedRegion) throws IOException { + + this.filePath = filePath; + this.fileChannel = fileChannel; + this.headerAndBufferArray = BufferReaderWriterUtil.allocatedWriteBufferArray(); + this.memoryMappedRegions = new ArrayList<>(4); + this.maxRegionSize = maxSizePerMappedRegion; + this.endOfCurrentRegion = maxSizePerMappedRegion; + } + + @Override + public void writeBuffer(Buffer buffer) throws IOException { + if (tryWriteBuffer(buffer)) { + return; + } + + mapRegionAndStartNext(); + + if (!tryWriteBuffer(buffer)) { + throwTooLargeBuffer(buffer); + } + } + + private boolean tryWriteBuffer(Buffer buffer) throws IOException { + final long spaceLeft = endOfCurrentRegion - pos; + final long bytesWritten = BufferReaderWriterUtil.writeToByteChannelIfBelowSize( + fileChannel, buffer, headerAndBufferArray, spaceLeft); + + if (bytesWritten >= 0) { + pos += bytesWritten; + return true; + } + else { Review comment: else can be removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13011) Release the PyFlink into PyPI
[ https://issues.apache.org/jira/browse/FLINK-13011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-13011: Description: FLINK-12962 adds the ability to build a PyFlink distribution package, but we have not yet released PyFlink to PyPI. The goal of JIRA is to publish the PyFlinjk distribution package built by FLINK-12962 to PyPI. https://pypi.org/ https://packaging.python.org/tutorials/packaging-projects/ was: FLINK-12962 adds the ability to build a PyFlink distribution package, but we have not yet released PyFlink to PyPI. The goal of JIRA is to publish the PyFlinjk distribution package built by FLINK-12962 to PyPI. https://pypi.org/ > Release the PyFlink into PyPI > - > > Key: FLINK-13011 > URL: https://issues.apache.org/jira/browse/FLINK-13011 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Build System >Affects Versions: 1.9.0 >Reporter: sunjincheng >Priority: Major > > FLINK-12962 adds the ability to build a PyFlink distribution package, but we > have not yet released PyFlink to PyPI. The goal of JIRA is to publish the > PyFlinjk distribution package built by FLINK-12962 to PyPI. > https://pypi.org/ > https://packaging.python.org/tutorials/packaging-projects/ > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12999) Can't generate valid execution plan for "SELECT uuid() FROM (VALUES(1,2,3)) T(a, b, c)"
[ https://issues.apache.org/jira/browse/FLINK-12999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-12999: - Summary: Can't generate valid execution plan for "SELECT uuid() FROM (VALUES(1,2,3)) T(a, b, c)" (was: Can't generate valid execution plan for "SELECT uuid() FROM VALUES(1,2,3) T(a, b, c)") > Can't generate valid execution plan for "SELECT uuid() FROM (VALUES(1,2,3)) > T(a, b, c)" > --- > > Key: FLINK-12999 > URL: https://issues.apache.org/jira/browse/FLINK-12999 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: Zhenghua Gao >Assignee: godfrey he >Priority: Major > > The ERROR message is: > > org.apache.flink.table.api.TableException: Cannot generate a valid execution > plan for the given query: > LogicalSink(fields=[EXPR$0]) > +- LogicalProject(EXPR$0=[UUID()]) > +- LogicalValues(tuples=[[\{ 1, 2, 3 }]]) > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72) > at > org.apache.flink.table.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:82) > at > org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:51) > at > org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:39) > at > org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:39) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:39) > at > org.apache.flink.table.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:65) > at > org.apache.flink.table.api.TableEnvironment.optimize(TableEnvironment.scala:251) > at > org.apache.flink.table.api.TableEnvironment.compileToExecNodePlan(TableEnvironment.scala:200) > at > org.apache.flink.table.api.TableEnvironment.compile(TableEnvironment.scala:184) > at > org.apache.flink.table.api.TableEnvironment.generateStreamGraph(TableEnvironment.scala:155) > at > org.apache.flink.table.api.BatchTableEnvironment.execute(BatchTableEnvironment.scala:93) > at > org.apache.flink.table.api.TableEnvironment.execute(TableEnvironment.scala:136) > at > org.apache.flink.table.runtime.utils.BatchTableEnvUtil$.collect(BatchTableEnvUtil.scala:55) > at > org.apache.flink.table.runtime.utils.TableUtil$.collectSink(TableUtil.scala:60) > at > org.apache.flink.table.runtime.utils.TableUtil$.collect(TableUtil.scala:41) > at > org.apache.flink.table.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:308) > at > org.apache.flink.table.runtime.utils.BatchTestBase.check(BatchTestBase.scala:164) > at > org.apache.flink.table.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:103) > at > org.apache.flink.table.runtime.batch.sql.ValuesITCase.test(ValuesITCase.scala:38) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at >
[jira] [Updated] (FLINK-13011) Release the PyFlink into PyPI
[ https://issues.apache.org/jira/browse/FLINK-13011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-13011: Description: FLINK-12962 adds the ability to build a PyFlink distribution package, but we have not yet released PyFlink to PyPI. The goal of JIRA is to publish the PyFlinjk distribution package built by FLINK-12962 to PyPI. https://pypi.org/ was: FLINK-12962 adds the ability to build a PyFlink distribution package, but we have not yet released PyFlink to PyPI. The goal of JIRA is to publish the PyFlinjk distribution package built by FLINK-12962 to PyPI. > Release the PyFlink into PyPI > - > > Key: FLINK-13011 > URL: https://issues.apache.org/jira/browse/FLINK-13011 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Build System >Affects Versions: 1.9.0 >Reporter: sunjincheng >Priority: Major > > FLINK-12962 adds the ability to build a PyFlink distribution package, but we > have not yet released PyFlink to PyPI. The goal of JIRA is to publish the > PyFlinjk distribution package built by FLINK-12962 to PyPI. https://pypi.org/ > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12882) Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters
[ https://issues.apache.org/jira/browse/FLINK-12882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12882: - Description: The ResultPartitionID could be got directly from ResultPartitionDeploymentDescriptor, so it is no need to pass ExecutionAttemptID to construct new ResultPartitionID during creating ResultPartition in factory. (was: The {{ExecutionAttemptID}} is only used for constructing {{ResultPartitionID}} during creating {{ResultPartitionWriters in ShuffleEnvironment}}. Actually the {{ResultPartitionID}} could be got directly from {{ResultPartitionDeploymentDescriptor}} via {{ShuffleDescriptor#getResultPartitionID}} then we could avoid passing this field in the interface to make it simple.) > Remove ExecutionAttemptID field from > ShuffleEnvironment#createResultPartitionWriters > > > Key: FLINK-12882 > URL: https://issues.apache.org/jira/browse/FLINK-12882 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The ResultPartitionID could be got directly from > ResultPartitionDeploymentDescriptor, so it is no need to pass > ExecutionAttemptID to construct new ResultPartitionID during creating > ResultPartition in factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12882) Remove ExecutionAttemptID argument from ResultPartitionFactory#create
[ https://issues.apache.org/jira/browse/FLINK-12882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-12882: - Summary: Remove ExecutionAttemptID argument from ResultPartitionFactory#create (was: Remove ExecutionAttemptID field from ShuffleEnvironment#createResultPartitionWriters) > Remove ExecutionAttemptID argument from ResultPartitionFactory#create > - > > Key: FLINK-12882 > URL: https://issues.apache.org/jira/browse/FLINK-12882 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The ResultPartitionID could be got directly from > ResultPartitionDeploymentDescriptor, so it is no need to pass > ExecutionAttemptID to construct new ResultPartitionID during creating > ResultPartition in factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] godfreyhe commented on issue #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on issue #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#issuecomment-506192171 update commit message as `Support intersect all/minus all in blink planner` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#discussion_r298004677 ## File path: docs/getting-started/tutorials/table_api.md ## @@ -0,0 +1,250 @@ +--- +title: "Table API" +nav-id: tableapitutorials +nav-title: 'Table API' +nav-parent_id: apitutorials +nav-pos: 1 +--- + + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, recorded streams and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Are We Building? + +In this tutorial, we'll show how to build a continuous ETL pipeline for tracking financial transactions by account over time. +We will start by building our report as a nightly batch job, and then migrate to a streaming pipeline to see how batch is just a special case of streaming. + +## Prerequisites + +We'll assume that you have some familiarity with Java or Scala, but you should be able to follow along even if you're coming from a different programming language. +We'll also assume that you're familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +If you want to follow along you will require a computer with: + +* Java 8 +* Maven + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project structure. + +{% highlight bash %} +$ mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink \ +-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} +-DarchetypeVersion={{ site.version }} \ +-DgroupId=spend-report \ +-DartifactId=spend-report \ +-Dversion=0.1 \ +-Dpackage=spendreport \ +-DinteractiveMode=false +{% endhighlight %} + +{% unless site.is_stable %} + +Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven official document + +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project in your editor you will see a file following code. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new TransactionTableSource()); +tEnv.registerTableSink("stdout", new StdOutTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("stdout"); + +env.execute("Spend Report"); +{% endhighlight %} + +Let's break down this code by component. + +## Breaking Down The Code + + The Execution Environment + +The first two lines set up our `ExecutionEnvironment`. +The execution environment is how we set properties for our deployments, specify whether we are writing a batch or streaming application, and create our sources. +Here we have chosen to use the batch environment since we are building a periodic batch report. +We then wrap it in a `BatchTableEnvironment` so to have full access to the Table Api. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} + + Registering Tables + +Next, we register tables that we can use for the input and output of our application. +The TableEnvironment maintains a catalog of tables that are registered by name. There are two types of tables, input tables and output tables. +Input tables can be referenced in Table API and SQL queries and provide input data. +Output tables can be used to emit the result of a Table API or SQL query to an external system. +Tables can support batch queries, streaming queries, or both. + +{% highlight java %} +tEnv.registerTableSource("transactions", new
[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#discussion_r298003160 ## File path: docs/getting-started/tutorials/table_api.md ## @@ -0,0 +1,250 @@ +--- +title: "Table API" +nav-id: tableapitutorials +nav-title: 'Table API' +nav-parent_id: apitutorials +nav-pos: 1 +--- + + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, recorded streams and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Are We Building? + +In this tutorial, we'll show how to build a continuous ETL pipeline for tracking financial transactions by account over time. +We will start by building our report as a nightly batch job, and then migrate to a streaming pipeline to see how batch is just a special case of streaming. + +## Prerequisites + +We'll assume that you have some familiarity with Java or Scala, but you should be able to follow along even if you're coming from a different programming language. +We'll also assume that you're familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +If you want to follow along you will require a computer with: + +* Java 8 +* Maven + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project structure. + +{% highlight bash %} +$ mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink \ +-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} +-DarchetypeVersion={{ site.version }} \ +-DgroupId=spend-report \ +-DartifactId=spend-report \ +-Dversion=0.1 \ +-Dpackage=spendreport \ +-DinteractiveMode=false +{% endhighlight %} + +{% unless site.is_stable %} + +Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven official document + +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project in your editor you will see a file following code. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new TransactionTableSource()); +tEnv.registerTableSink("stdout", new StdOutTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("stdout"); + +env.execute("Spend Report"); +{% endhighlight %} + +Let's break down this code by component. + +## Breaking Down The Code + + The Execution Environment + +The first two lines set up our `ExecutionEnvironment`. +The execution environment is how we set properties for our deployments, specify whether we are writing a batch or streaming application, and create our sources. +Here we have chosen to use the batch environment since we are building a periodic batch report. +We then wrap it in a `BatchTableEnvironment` so to have full access to the Table Api. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} + + Registering Tables + +Next, we register tables that we can use for the input and output of our application. +The TableEnvironment maintains a catalog of tables that are registered by name. There are two types of tables, input tables and output tables. +Input tables can be referenced in Table API and SQL queries and provide input data. +Output tables can be used to emit the result of a Table API or SQL query to an external system. +Tables can support batch queries, streaming queries, or both. + +{% highlight java %} +tEnv.registerTableSource("transactions", new
[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#discussion_r298005605 ## File path: docs/getting-started/tutorials/table_api.md ## @@ -0,0 +1,250 @@ +--- +title: "Table API" +nav-id: tableapitutorials +nav-title: 'Table API' +nav-parent_id: apitutorials +nav-pos: 1 +--- + + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, recorded streams and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Are We Building? + +In this tutorial, we'll show how to build a continuous ETL pipeline for tracking financial transactions by account over time. +We will start by building our report as a nightly batch job, and then migrate to a streaming pipeline to see how batch is just a special case of streaming. + +## Prerequisites + +We'll assume that you have some familiarity with Java or Scala, but you should be able to follow along even if you're coming from a different programming language. +We'll also assume that you're familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +If you want to follow along you will require a computer with: + +* Java 8 +* Maven + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project structure. + +{% highlight bash %} +$ mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink \ +-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} +-DarchetypeVersion={{ site.version }} \ +-DgroupId=spend-report \ +-DartifactId=spend-report \ +-Dversion=0.1 \ +-Dpackage=spendreport \ +-DinteractiveMode=false +{% endhighlight %} + +{% unless site.is_stable %} + +Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven official document + +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project in your editor you will see a file following code. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new TransactionTableSource()); +tEnv.registerTableSink("stdout", new StdOutTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("stdout"); + +env.execute("Spend Report"); +{% endhighlight %} + +Let's break down this code by component. + +## Breaking Down The Code + + The Execution Environment + +The first two lines set up our `ExecutionEnvironment`. +The execution environment is how we set properties for our deployments, specify whether we are writing a batch or streaming application, and create our sources. +Here we have chosen to use the batch environment since we are building a periodic batch report. +We then wrap it in a `BatchTableEnvironment` so to have full access to the Table Api. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} + + Registering Tables + +Next, we register tables that we can use for the input and output of our application. +The TableEnvironment maintains a catalog of tables that are registered by name. There are two types of tables, input tables and output tables. +Input tables can be referenced in Table API and SQL queries and provide input data. +Output tables can be used to emit the result of a Table API or SQL query to an external system. +Tables can support batch queries, streaming queries, or both. + +{% highlight java %} +tEnv.registerTableSource("transactions", new
[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#discussion_r298002892 ## File path: docs/getting-started/tutorials/table_api.md ## @@ -0,0 +1,250 @@ +--- +title: "Table API" +nav-id: tableapitutorials +nav-title: 'Table API' +nav-parent_id: apitutorials +nav-pos: 1 +--- + + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, recorded streams and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Are We Building? + +In this tutorial, we'll show how to build a continuous ETL pipeline for tracking financial transactions by account over time. +We will start by building our report as a nightly batch job, and then migrate to a streaming pipeline to see how batch is just a special case of streaming. + +## Prerequisites + +We'll assume that you have some familiarity with Java or Scala, but you should be able to follow along even if you're coming from a different programming language. +We'll also assume that you're familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +If you want to follow along you will require a computer with: + +* Java 8 +* Maven + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project structure. + +{% highlight bash %} +$ mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink \ +-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} +-DarchetypeVersion={{ site.version }} \ +-DgroupId=spend-report \ +-DartifactId=spend-report \ +-Dversion=0.1 \ +-Dpackage=spendreport \ +-DinteractiveMode=false +{% endhighlight %} + +{% unless site.is_stable %} + +Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven official document + +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project in your editor you will see a file following code. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new TransactionTableSource()); +tEnv.registerTableSink("stdout", new StdOutTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("stdout"); + +env.execute("Spend Report"); +{% endhighlight %} + +Let's break down this code by component. + +## Breaking Down The Code + + The Execution Environment + +The first two lines set up our `ExecutionEnvironment`. +The execution environment is how we set properties for our deployments, specify whether we are writing a batch or streaming application, and create our sources. +Here we have chosen to use the batch environment since we are building a periodic batch report. +We then wrap it in a `BatchTableEnvironment` so to have full access to the Table Api. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} + + Registering Tables + +Next, we register tables that we can use for the input and output of our application. +The TableEnvironment maintains a catalog of tables that are registered by name. There are two types of tables, input tables and output tables. +Input tables can be referenced in Table API and SQL queries and provide input data. +Output tables can be used to emit the result of a Table API or SQL query to an external system. +Tables can support batch queries, streaming queries, or both. + +{% highlight java %} Review comment: We should also add Scala here, shouldn't we?
[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#discussion_r298004640 ## File path: docs/getting-started/tutorials/table_api.md ## @@ -0,0 +1,250 @@ +--- +title: "Table API" +nav-id: tableapitutorials +nav-title: 'Table API' +nav-parent_id: apitutorials +nav-pos: 1 +--- + + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, recorded streams and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Are We Building? + +In this tutorial, we'll show how to build a continuous ETL pipeline for tracking financial transactions by account over time. +We will start by building our report as a nightly batch job, and then migrate to a streaming pipeline to see how batch is just a special case of streaming. + +## Prerequisites + +We'll assume that you have some familiarity with Java or Scala, but you should be able to follow along even if you're coming from a different programming language. +We'll also assume that you're familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +If you want to follow along you will require a computer with: + +* Java 8 +* Maven + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing](https://flink.apache.org/community.html#mailing-lists) list is consistently rated as one of the most active of any Apache project and a great way to get help quickly. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project structure. + +{% highlight bash %} +$ mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink \ +-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} +-DarchetypeVersion={{ site.version }} \ +-DgroupId=spend-report \ +-DartifactId=spend-report \ +-Dversion=0.1 \ +-Dpackage=spendreport \ +-DinteractiveMode=false +{% endhighlight %} + +{% unless site.is_stable %} + +Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven official document + +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project in your editor you will see a file following code. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new TransactionTableSource()); +tEnv.registerTableSink("stdout", new StdOutTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("stdout"); + +env.execute("Spend Report"); +{% endhighlight %} + +Let's break down this code by component. + +## Breaking Down The Code + + The Execution Environment + +The first two lines set up our `ExecutionEnvironment`. +The execution environment is how we set properties for our deployments, specify whether we are writing a batch or streaming application, and create our sources. +Here we have chosen to use the batch environment since we are building a periodic batch report. +We then wrap it in a `BatchTableEnvironment` so to have full access to the Table Api. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} + + Registering Tables + +Next, we register tables that we can use for the input and output of our application. +The TableEnvironment maintains a catalog of tables that are registered by name. There are two types of tables, input tables and output tables. +Input tables can be referenced in Table API and SQL queries and provide input data. +Output tables can be used to emit the result of a Table API or SQL query to an external system. +Tables can support batch queries, streaming queries, or both. + +{% highlight java %} +tEnv.registerTableSource("transactions", new TransactionTableSource());
[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#discussion_r298003884 ## File path: docs/getting-started/tutorials/table_api.md ## @@ -0,0 +1,250 @@ +--- +title: "Table API" +nav-id: tableapitutorials +nav-title: 'Table API' +nav-parent_id: apitutorials +nav-pos: 1 +--- + + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, recorded streams and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Are We Building? + +In this tutorial, we'll show how to build a continuous ETL pipeline for tracking financial transactions by account over time. +We will start by building our report as a nightly batch job, and then migrate to a streaming pipeline to see how batch is just a special case of streaming. + +## Prerequisites + +We'll assume that you have some familiarity with Java or Scala, but you should be able to follow along even if you're coming from a different programming language. +We'll also assume that you're familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +If you want to follow along you will require a computer with: + +* Java 8 +* Maven + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project structure. + +{% highlight bash %} +$ mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink \ +-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} +-DarchetypeVersion={{ site.version }} \ +-DgroupId=spend-report \ +-DartifactId=spend-report \ +-Dversion=0.1 \ +-Dpackage=spendreport \ +-DinteractiveMode=false +{% endhighlight %} + +{% unless site.is_stable %} + +Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven official document + +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project in your editor you will see a file following code. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new TransactionTableSource()); +tEnv.registerTableSink("stdout", new StdOutTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("stdout"); + +env.execute("Spend Report"); +{% endhighlight %} + +Let's break down this code by component. + +## Breaking Down The Code + + The Execution Environment + +The first two lines set up our `ExecutionEnvironment`. +The execution environment is how we set properties for our deployments, specify whether we are writing a batch or streaming application, and create our sources. +Here we have chosen to use the batch environment since we are building a periodic batch report. +We then wrap it in a `BatchTableEnvironment` so to have full access to the Table Api. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} + + Registering Tables + +Next, we register tables that we can use for the input and output of our application. +The TableEnvironment maintains a catalog of tables that are registered by name. There are two types of tables, input tables and output tables. +Input tables can be referenced in Table API and SQL queries and provide input data. +Output tables can be used to emit the result of a Table API or SQL query to an external system. +Tables can support batch queries, streaming queries, or both. + +{% highlight java %} +tEnv.registerTableSource("transactions", new
[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#discussion_r298004962 ## File path: docs/getting-started/tutorials/table_api.md ## @@ -0,0 +1,250 @@ +--- +title: "Table API" +nav-id: tableapitutorials +nav-title: 'Table API' +nav-parent_id: apitutorials +nav-pos: 1 +--- + + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, recorded streams and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Are We Building? + +In this tutorial, we'll show how to build a continuous ETL pipeline for tracking financial transactions by account over time. +We will start by building our report as a nightly batch job, and then migrate to a streaming pipeline to see how batch is just a special case of streaming. + +## Prerequisites + +We'll assume that you have some familiarity with Java or Scala, but you should be able to follow along even if you're coming from a different programming language. +We'll also assume that you're familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +If you want to follow along you will require a computer with: + +* Java 8 +* Maven + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project structure. + +{% highlight bash %} +$ mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink \ +-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} +-DarchetypeVersion={{ site.version }} \ +-DgroupId=spend-report \ +-DartifactId=spend-report \ +-Dversion=0.1 \ +-Dpackage=spendreport \ +-DinteractiveMode=false +{% endhighlight %} + +{% unless site.is_stable %} + +Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven official document + +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project in your editor you will see a file following code. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new TransactionTableSource()); +tEnv.registerTableSink("stdout", new StdOutTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("stdout"); + +env.execute("Spend Report"); +{% endhighlight %} + +Let's break down this code by component. + +## Breaking Down The Code + + The Execution Environment + +The first two lines set up our `ExecutionEnvironment`. +The execution environment is how we set properties for our deployments, specify whether we are writing a batch or streaming application, and create our sources. +Here we have chosen to use the batch environment since we are building a periodic batch report. +We then wrap it in a `BatchTableEnvironment` so to have full access to the Table Api. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} + + Registering Tables + +Next, we register tables that we can use for the input and output of our application. +The TableEnvironment maintains a catalog of tables that are registered by name. There are two types of tables, input tables and output tables. +Input tables can be referenced in Table API and SQL queries and provide input data. +Output tables can be used to emit the result of a Table API or SQL query to an external system. +Tables can support batch queries, streaming queries, or both. + +{% highlight java %} +tEnv.registerTableSource("transactions", new
[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#discussion_r298003032 ## File path: docs/getting-started/tutorials/table_api.md ## @@ -0,0 +1,250 @@ +--- +title: "Table API" +nav-id: tableapitutorials +nav-title: 'Table API' +nav-parent_id: apitutorials +nav-pos: 1 +--- + + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, recorded streams and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Are We Building? + +In this tutorial, we'll show how to build a continuous ETL pipeline for tracking financial transactions by account over time. +We will start by building our report as a nightly batch job, and then migrate to a streaming pipeline to see how batch is just a special case of streaming. + +## Prerequisites + +We'll assume that you have some familiarity with Java or Scala, but you should be able to follow along even if you're coming from a different programming language. +We'll also assume that you're familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +If you want to follow along you will require a computer with: + +* Java 8 +* Maven + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project structure. + +{% highlight bash %} +$ mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink \ +-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} +-DarchetypeVersion={{ site.version }} \ +-DgroupId=spend-report \ +-DartifactId=spend-report \ +-Dversion=0.1 \ +-Dpackage=spendreport \ +-DinteractiveMode=false +{% endhighlight %} + +{% unless site.is_stable %} + +Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven official document + +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project in your editor you will see a file following code. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new TransactionTableSource()); +tEnv.registerTableSink("stdout", new StdOutTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("stdout"); + +env.execute("Spend Report"); +{% endhighlight %} + +Let's break down this code by component. + +## Breaking Down The Code + + The Execution Environment + +The first two lines set up our `ExecutionEnvironment`. +The execution environment is how we set properties for our deployments, specify whether we are writing a batch or streaming application, and create our sources. +Here we have chosen to use the batch environment since we are building a periodic batch report. +We then wrap it in a `BatchTableEnvironment` so to have full access to the Table Api. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} + + Registering Tables + +Next, we register tables that we can use for the input and output of our application. +The TableEnvironment maintains a catalog of tables that are registered by name. There are two types of tables, input tables and output tables. +Input tables can be referenced in Table API and SQL queries and provide input data. Review comment: Maybe mention that this corresponds to "sinks" and "sources" in other frameworks/APIs. This is an automated message from the Apache Git Service. To respond to the
[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#discussion_r298003359 ## File path: docs/getting-started/tutorials/table_api.md ## @@ -0,0 +1,250 @@ +--- +title: "Table API" +nav-id: tableapitutorials +nav-title: 'Table API' +nav-parent_id: apitutorials +nav-pos: 1 +--- + + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, recorded streams and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Are We Building? + +In this tutorial, we'll show how to build a continuous ETL pipeline for tracking financial transactions by account over time. +We will start by building our report as a nightly batch job, and then migrate to a streaming pipeline to see how batch is just a special case of streaming. + +## Prerequisites + +We'll assume that you have some familiarity with Java or Scala, but you should be able to follow along even if you're coming from a different programming language. +We'll also assume that you're familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +If you want to follow along you will require a computer with: + +* Java 8 +* Maven + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project structure. + +{% highlight bash %} +$ mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink \ +-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} +-DarchetypeVersion={{ site.version }} \ +-DgroupId=spend-report \ +-DartifactId=spend-report \ +-Dversion=0.1 \ +-Dpackage=spendreport \ +-DinteractiveMode=false +{% endhighlight %} + +{% unless site.is_stable %} + +Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven official document + +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project in your editor you will see a file following code. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new TransactionTableSource()); +tEnv.registerTableSink("stdout", new StdOutTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("stdout"); + +env.execute("Spend Report"); +{% endhighlight %} + +Let's break down this code by component. + +## Breaking Down The Code + + The Execution Environment + +The first two lines set up our `ExecutionEnvironment`. +The execution environment is how we set properties for our deployments, specify whether we are writing a batch or streaming application, and create our sources. +Here we have chosen to use the batch environment since we are building a periodic batch report. +We then wrap it in a `BatchTableEnvironment` so to have full access to the Table Api. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} + + Registering Tables + +Next, we register tables that we can use for the input and output of our application. +The TableEnvironment maintains a catalog of tables that are registered by name. There are two types of tables, input tables and output tables. +Input tables can be referenced in Table API and SQL queries and provide input data. +Output tables can be used to emit the result of a Table API or SQL query to an external system. +Tables can support batch queries, streaming queries, or both. + +{% highlight java %} +tEnv.registerTableSource("transactions", new
[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#discussion_r298004832 ## File path: docs/getting-started/tutorials/table_api.md ## @@ -0,0 +1,250 @@ +--- +title: "Table API" +nav-id: tableapitutorials +nav-title: 'Table API' +nav-parent_id: apitutorials +nav-pos: 1 +--- + + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, recorded streams and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Are We Building? + +In this tutorial, we'll show how to build a continuous ETL pipeline for tracking financial transactions by account over time. +We will start by building our report as a nightly batch job, and then migrate to a streaming pipeline to see how batch is just a special case of streaming. + +## Prerequisites + +We'll assume that you have some familiarity with Java or Scala, but you should be able to follow along even if you're coming from a different programming language. +We'll also assume that you're familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +If you want to follow along you will require a computer with: + +* Java 8 +* Maven + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project structure. + +{% highlight bash %} +$ mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink \ +-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} +-DarchetypeVersion={{ site.version }} \ +-DgroupId=spend-report \ +-DartifactId=spend-report \ +-Dversion=0.1 \ +-Dpackage=spendreport \ +-DinteractiveMode=false +{% endhighlight %} + +{% unless site.is_stable %} + +Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven official document + +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project in your editor you will see a file following code. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new TransactionTableSource()); +tEnv.registerTableSink("stdout", new StdOutTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("stdout"); + +env.execute("Spend Report"); +{% endhighlight %} + +Let's break down this code by component. + +## Breaking Down The Code + + The Execution Environment + +The first two lines set up our `ExecutionEnvironment`. +The execution environment is how we set properties for our deployments, specify whether we are writing a batch or streaming application, and create our sources. +Here we have chosen to use the batch environment since we are building a periodic batch report. +We then wrap it in a `BatchTableEnvironment` so to have full access to the Table Api. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} + + Registering Tables + +Next, we register tables that we can use for the input and output of our application. +The TableEnvironment maintains a catalog of tables that are registered by name. There are two types of tables, input tables and output tables. +Input tables can be referenced in Table API and SQL queries and provide input data. +Output tables can be used to emit the result of a Table API or SQL query to an external system. +Tables can support batch queries, streaming queries, or both. + +{% highlight java %} +tEnv.registerTableSource("transactions", new
[GitHub] [flink] knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough
knaufk commented on a change in pull request #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#discussion_r298003620 ## File path: docs/getting-started/tutorials/table_api.md ## @@ -0,0 +1,250 @@ +--- +title: "Table API" +nav-id: tableapitutorials +nav-title: 'Table API' +nav-parent_id: apitutorials +nav-pos: 1 +--- + + +Apache Flink offers a Table API as a unified, relational API for batch and stream processing, i.e., queries are executed with the same semantics on unbounded, real-time streams or bounded, recorded streams and produce the same results. +The Table API in Flink is commonly used to ease the definition of data analytics, data pipelining, and ETL applications. + +* This will be replaced by the TOC +{:toc} + +## What Are We Building? + +In this tutorial, we'll show how to build a continuous ETL pipeline for tracking financial transactions by account over time. +We will start by building our report as a nightly batch job, and then migrate to a streaming pipeline to see how batch is just a special case of streaming. + +## Prerequisites + +We'll assume that you have some familiarity with Java or Scala, but you should be able to follow along even if you're coming from a different programming language. +We'll also assume that you're familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses. + +If you want to follow along you will require a computer with: + +* Java 8 +* Maven + +## Help, I’m Stuck! + +If you get stuck, check out the [community support resources](https://flink.apache.org/community.html). +In particular, Apache Flink's [user mailing list](https://flink.apache.org/community.html#mailing-lists) is consistently ranked as one of the most active of any Apache project and a great way to get help quickly. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project structure. + +{% highlight bash %} +$ mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink \ +-DarchetypeArtifactId=flink-walkthrough-table \{% unless site.is_stable %} + -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \{% endunless %} +-DarchetypeVersion={{ site.version }} \ +-DgroupId=spend-report \ +-DartifactId=spend-report \ +-Dversion=0.1 \ +-Dpackage=spendreport \ +-DinteractiveMode=false +{% endhighlight %} + +{% unless site.is_stable %} + +Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to http://maven.apache.org/archetype/maven-archetype-plugin/archetype-repository.html;>Maven official document + +{% endunless %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +Maven will create a project with all the dependencies to complete this tutorial. +After importing the project in your editor you will see a file following code. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); + +tEnv.registerTableSource("transactions", new TransactionTableSource()); +tEnv.registerTableSink("stdout", new StdOutTableSink()); +tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); + +tEnv + .scan("transactions") + .insertInto("stdout"); + +env.execute("Spend Report"); +{% endhighlight %} + +Let's break down this code by component. + +## Breaking Down The Code + + The Execution Environment + +The first two lines set up our `ExecutionEnvironment`. +The execution environment is how we set properties for our deployments, specify whether we are writing a batch or streaming application, and create our sources. +Here we have chosen to use the batch environment since we are building a periodic batch report. +We then wrap it in a `BatchTableEnvironment` so to have full access to the Table Api. + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); +{% endhighlight %} + + Registering Tables + +Next, we register tables that we can use for the input and output of our application. +The TableEnvironment maintains a catalog of tables that are registered by name. There are two types of tables, input tables and output tables. +Input tables can be referenced in Table API and SQL queries and provide input data. +Output tables can be used to emit the result of a Table API or SQL query to an external system. +Tables can support batch queries, streaming queries, or both. + +{% highlight java %} +tEnv.registerTableSource("transactions", new
[jira] [Commented] (FLINK-13004) Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState
[ https://issues.apache.org/jira/browse/FLINK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873815#comment-16873815 ] Yun Tang commented on FLINK-13004: -- [~jark], actually, the {{timestamp}} passed in was wrapper {{Long.}} For the comparison of wrapper and primitive value problem, you could refer to [question here|https://stackoverflow.com/questions/19485818/what-are-not-2-long-variables-equal-with-operator-to-compare-in-java]. I am preparing a PR to fix this and found previous tests have been modified to cater for this bug. > Correct the logic of needToCleanupState in > KeyedProcessFunctionWithCleanupState > --- > > Key: FLINK-13004 > URL: https://issues.apache.org/jira/browse/FLINK-13004 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > > Current implementation of needToCleanupState in > KeyedProcessFunctionWithCleanupState actually has potention bug: > {code:java} > protected Boolean needToCleanupState(Long timestamp) throws IOException { >if (stateCleaningEnabled) { > Long cleanupTime = cleanupTimeState.value(); > // check that the triggered timer is the last registered processing > time timer. > return null != cleanupTime && timestamp == cleanupTime; >} else { > return false; >} > } > {code} > Please note that it directly use "==" to judge whether *Long* type timestamp > and cleanupTime equals. However, if that value is larger than 127L, the > result would actually return false instead of wanted true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297967960 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, GREATER_THAN_OR_EQUAL, IF} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Intersect +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) Review comment: UDTF (`replicate_row`) can not be in `select` clause, please use `LATERAL TABLE(replicate_row(XXX))` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297985745 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteMinusAll.scala ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.GREATER_THAN + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Minus +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Minus]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) + * FROM ( + * SELECT c1, sum_val + * FROM ( + * SELECT c1, sum(vcol) AS sum_val + * FROM ( + * SELECT c1, 1L as vcol FROM ut1 + * UNION ALL + * SELECT c1, -1L as vcol FROM ut2 + * ) AS union_all + *GROUP BY union_all.c1 + * ) + *WHERE sum_val > 0 + * ) + * ) + * }}} + */ +class RewriteMinusAll extends ReplaceSetOpWithJoinRuleBase( + classOf[Minus], "RewriteMinusAll") { + + override def matches(call: RelOptRuleCall): Boolean = { +val minus: Minus = call.rel(0) +!minus.isDistinct Review comment: `minus.all` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r298000417 ## File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/SetOperatorsTest.xml ## @@ -40,6 +40,34 @@ GroupAggregate(groupBy=[c], select=[c]) +- Exchange(distribution=[hash[f]]) +- Calc(select=[f]) +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) +]]> + + + Review comment: this case should be in second commit ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297973779 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, GREATER_THAN_OR_EQUAL, IF} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Intersect +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) + *FROM ( + * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count + * FROM ( + * SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt + * FROM ( + * SELECT c1, true as vcol1, null as vcol2 FROM ut1 + * UNION ALL + * SELECT c1, null as vcol1, true as vcol2 FROM ut2 + * ) AS union_all + * GROUP BY c1 + * HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1 + * ) + * ) + * ) + * }}} + */ +class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase( + classOf[Intersect], "RewriteIntersectAll") { + + override def matches(call: RelOptRuleCall): Boolean = { +val intersect: Intersect = call.rel(0) +!intersect.isDistinct + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val intersect: Intersect = call.rel(0) +val left = intersect.getInput(0) +val right = intersect.getInput(1) + +val fields = Util.range(intersect.getRowType.getFieldCount) + +val builder1 = call.builder Review comment: builder1 => leftBuilder ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297962213 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.sql + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableConfigOptions +import org.apache.flink.table.runtime.batch.sql.join.JoinITCaseHelper +import org.apache.flink.table.runtime.batch.sql.join.JoinType.{JoinType, _} +import org.apache.flink.table.runtime.utils.BatchTestBase.row +import org.apache.flink.table.runtime.utils.TestData._ +import org.apache.flink.table.runtime.utils.{BatchScalaTableEnvUtil, BatchTableEnvUtil, BatchTestBase} + +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{Before, Test} + +import java.util + +import scala.util.Random + +@RunWith(classOf[Parameterized]) +class SetOperatorsITCase(joinType: JoinType) extends BatchTestBase { + + @Before + def before(): Unit = { + tEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 3) +registerCollection("AllNullTable3", allNullData3, type3, "a, b, c") +registerCollection("SmallTable3", smallData3, type3, "a, b, c") +registerCollection("Table3", data3, type3, "a, b, c") +registerCollection("Table5", data5, type5, "a, b, c, d, e") +JoinITCaseHelper.disableOtherJoinOpForJoin(tEnv, joinType) + } + + @Test + def testIntersect(): Unit = { +val data = List( + row(1, 1L, "Hi"), + row(2, 2L, "Hello"), + row(2, 2L, "Hello"), + row(3, 2L, "Hello world!") +) +val shuffleData = Random.shuffle(data) +BatchTableEnvUtil.registerCollection( + tEnv, + "T", + shuffleData, + new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO), + "a, b, c") + +checkResult( + "SELECT c FROM SmallTable3 INTERSECT SELECT c FROM T", + Seq(row("Hi"), row("Hello"))) + } + + @Test + def testIntersectWithFilter(): Unit = { +checkResult( + "SELECT c FROM ((SELECT * FROM SmallTable3) INTERSECT (SELECT * FROM Table3)) WHERE a > 1", + Seq(row("Hello"), row("Hello world"))) + } + + @Test + def testExcept(): Unit = { +val data = List(row(1, 1L, "Hi")) +BatchTableEnvUtil.registerCollection( + tEnv, + "T", data, + new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO), + "a, b, c") + +checkResult( + "SELECT c FROM SmallTable3 EXCEPT (SELECT c FROM T)", + Seq(row("Hello"), row("Hello world"))) + } + + @Test + def testExceptWithFilter(): Unit = { +checkResult( + "SELECT c FROM (" + + "SELECT * FROM SmallTable3 EXCEPT (SELECT a, b, d FROM Table5))" + + "WHERE b < 2", + Seq(row("Hi"))) + } + + @Test + def testIntersectWithNulls(): Unit = { +checkResult( + "SELECT c FROM AllNullTable3 INTERSECT SELECT c FROM AllNullTable3", + Seq(row(null))) + } + + @Test + def testExceptWithNulls(): Unit = { +checkResult( + "SELECT c FROM AllNullTable3 EXCEPT SELECT c FROM AllNullTable3", + Seq()) + } + + @Test + def testIntersectAll(): Unit = { +BatchScalaTableEnvUtil.registerCollection(tEnv, "T1", Seq(1, 1, 1, 2, 2), "c") +BatchScalaTableEnvUtil.registerCollection(tEnv, "T2", Seq(1, 2, 2, 2, 3), "c") +checkResult( + "SELECT c FROM T1 INTERSECT ALL SELECT c FROM T2", + Seq(row(1), row(2), row(2))) + } + + @Test + def testMinusAll(): Unit = { +BatchScalaTableEnvUtil.registerCollection(tEnv, "T2", Seq((1, 1L, "Hi")), "a, b, c") +val t1 = "SELECT * FROM SmallTable3" +val t2 = "SELECT * FROM T2" +checkResult( + s"SELECT c FROM (($t1 UNION ALL $t1 UNION ALL $t1) EXCEPT ALL ($t2 UNION ALL $t2))", + Seq( +
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297963859 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala ## @@ -55,4 +71,47 @@ abstract class ReplaceSetOpWithJoinRuleBase[T <: SetOp]( } conditions } + + protected def replicateRows( + builder: RelBuilder, outputType: RelDataType, fields: util.List[Integer]): RelNode = { +// construct LogicalTableFunctionScan +val logicalType = toLogicalRowType(outputType) +val fieldNames = outputType.getFieldNames.toSeq.toArray +val fieldTypes = toLogicalRowType(outputType) +.getChildren.map(fromLogicalTypeToTypeInfo).toArray +val tf = new ReplicateRows(fieldTypes) +val resultType = fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, fieldNames)) +val function = new TypedFlinkTableFunction(tf, resultType) +val typeFactory = builder.getTypeFactory.asInstanceOf[FlinkTypeFactory] +val sqlFunction = new TableSqlFunction( + tf.functionIdentifier, + tf.toString, + tf, + resultType, + typeFactory, + function) + +val scan = LogicalTableFunctionScan.create( + builder.peek().getCluster, Review comment: create a `cluster` variable ? as `builder.peek().getCluster` is used twice This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297962835 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala ## @@ -55,4 +71,47 @@ abstract class ReplaceSetOpWithJoinRuleBase[T <: SetOp]( } conditions } + + protected def replicateRows( + builder: RelBuilder, outputType: RelDataType, fields: util.List[Integer]): RelNode = { +// construct LogicalTableFunctionScan +val logicalType = toLogicalRowType(outputType) +val fieldNames = outputType.getFieldNames.toSeq.toArray +val fieldTypes = toLogicalRowType(outputType) Review comment: `toLogicalRowType(outputType)` is duplicate, use `logicalType` defined before This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297964789 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, GREATER_THAN_OR_EQUAL, IF} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Intersect +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : Review comment: Original Query ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297966548 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, GREATER_THAN_OR_EQUAL, IF} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Intersect +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) + *FROM ( + * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count + * FROM ( + * SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt + * FROM ( + * SELECT c1, true as vcol1, null as vcol2 FROM ut1 + * UNION ALL + * SELECT c1, null as vcol1, true as vcol2 FROM ut2 + * ) AS union_all + * GROUP BY c1 + * HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1 Review comment: use `count(vcol1)` instead of `vcol1_cnt`, same as `vcol2_cnt` or `vcol1_cnt >= 1 AND vcol2_cnt >= 1` can be used in a `where` clause This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297962742 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala ## @@ -55,4 +71,47 @@ abstract class ReplaceSetOpWithJoinRuleBase[T <: SetOp]( } conditions } + + protected def replicateRows( Review comment: the class name `ReplaceSetOpWithJoinRuleBase` is not proper now, because `RewriteIntersectAll` and `RewriteMinusAll` will not rewrite `IntersectAll` and `MinusAll` to a join please add javadoc for this method This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297964429 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, GREATER_THAN_OR_EQUAL, IF} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Intersect +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. Review comment: what's the meaning of `Generate operator` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297974855 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, GREATER_THAN_OR_EQUAL, IF} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Intersect +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) + *FROM ( + * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count + * FROM ( + * SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt + * FROM ( + * SELECT c1, true as vcol1, null as vcol2 FROM ut1 + * UNION ALL + * SELECT c1, null as vcol1, true as vcol2 FROM ut2 + * ) AS union_all + * GROUP BY c1 + * HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1 + * ) + * ) + * ) + * }}} + */ +class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase( + classOf[Intersect], "RewriteIntersectAll") { + + override def matches(call: RelOptRuleCall): Boolean = { +val intersect: Intersect = call.rel(0) +!intersect.isDistinct + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val intersect: Intersect = call.rel(0) +val left = intersect.getInput(0) +val right = intersect.getInput(1) + +val fields = Util.range(intersect.getRowType.getFieldCount) + +val builder1 = call.builder +val boolType = builder1.getTypeFactory.createSqlType(SqlTypeName.BOOLEAN) +val leftWithAddedVirtualCols = builder1 Review comment: leftWithAddedVirtualCols => leftWithMarker ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297985301 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteMinusAll.scala ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.GREATER_THAN + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Minus +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Minus]] operator using a combination of Union, Aggregate + * and Generate operator. Review comment: same as above This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297969993 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, GREATER_THAN_OR_EQUAL, IF} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Intersect +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) + *FROM ( + * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count + * FROM ( + * SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt + * FROM ( + * SELECT c1, true as vcol1, null as vcol2 FROM ut1 + * UNION ALL + * SELECT c1, null as vcol1, true as vcol2 FROM ut2 + * ) AS union_all + * GROUP BY c1 + * HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1 + * ) + * ) + * ) + * }}} + */ +class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase( + classOf[Intersect], "RewriteIntersectAll") { + + override def matches(call: RelOptRuleCall): Boolean = { +val intersect: Intersect = call.rel(0) +!intersect.isDistinct Review comment: `intersect.all` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297973264 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, GREATER_THAN_OR_EQUAL, IF} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Intersect +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) + *FROM ( + * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count + * FROM ( + * SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt + * FROM ( + * SELECT c1, true as vcol1, null as vcol2 FROM ut1 + * UNION ALL + * SELECT c1, null as vcol1, true as vcol2 FROM ut2 + * ) AS union_all + * GROUP BY c1 + * HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1 + * ) + * ) + * ) + * }}} + */ +class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase( + classOf[Intersect], "RewriteIntersectAll") { + + override def matches(call: RelOptRuleCall): Boolean = { +val intersect: Intersect = call.rel(0) +!intersect.isDistinct + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val intersect: Intersect = call.rel(0) +val left = intersect.getInput(0) +val right = intersect.getInput(1) + +val fields = Util.range(intersect.getRowType.getFieldCount) + +val builder1 = call.builder +val boolType = builder1.getTypeFactory.createSqlType(SqlTypeName.BOOLEAN) +val leftWithAddedVirtualCols = builder1 +.push(left) +.project( + builder1.fields(fields) ++ Seq( +builder1.alias(builder1.literal(true), "vcol1"), Review comment: please choose a more meaningful name, such as `left_marker` ? so does `vcol2` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297974008 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, GREATER_THAN_OR_EQUAL, IF} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Intersect +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) + *FROM ( + * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count + * FROM ( + * SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt + * FROM ( + * SELECT c1, true as vcol1, null as vcol2 FROM ut1 + * UNION ALL + * SELECT c1, null as vcol1, true as vcol2 FROM ut2 + * ) AS union_all + * GROUP BY c1 + * HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1 + * ) + * ) + * ) + * }}} + */ +class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase( + classOf[Intersect], "RewriteIntersectAll") { + + override def matches(call: RelOptRuleCall): Boolean = { +val intersect: Intersect = call.rel(0) +!intersect.isDistinct + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val intersect: Intersect = call.rel(0) +val left = intersect.getInput(0) +val right = intersect.getInput(1) + +val fields = Util.range(intersect.getRowType.getFieldCount) + +val builder1 = call.builder +val boolType = builder1.getTypeFactory.createSqlType(SqlTypeName.BOOLEAN) +val leftWithAddedVirtualCols = builder1 +.push(left) +.project( + builder1.fields(fields) ++ Seq( +builder1.alias(builder1.literal(true), "vcol1"), +builder1.alias(builder1.getRexBuilder.makeNullLiteral(boolType), "vcol2"))) +.build() + +val builder2 = call.builder Review comment: builder2 => rightBuilder ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297963892 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/ReplaceSetOpWithJoinRuleBase.scala ## @@ -55,4 +71,47 @@ abstract class ReplaceSetOpWithJoinRuleBase[T <: SetOp]( } conditions } + + protected def replicateRows( + builder: RelBuilder, outputType: RelDataType, fields: util.List[Integer]): RelNode = { +// construct LogicalTableFunctionScan +val logicalType = toLogicalRowType(outputType) +val fieldNames = outputType.getFieldNames.toSeq.toArray +val fieldTypes = toLogicalRowType(outputType) +.getChildren.map(fromLogicalTypeToTypeInfo).toArray +val tf = new ReplicateRows(fieldTypes) +val resultType = fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, fieldNames)) +val function = new TypedFlinkTableFunction(tf, resultType) +val typeFactory = builder.getTypeFactory.asInstanceOf[FlinkTypeFactory] +val sqlFunction = new TableSqlFunction( + tf.functionIdentifier, + tf.toString, + tf, + resultType, + typeFactory, + function) + +val scan = LogicalTableFunctionScan.create( + builder.peek().getCluster, + new util.ArrayList[RelNode](), + builder.call( +sqlFunction, +builder.fields(Util.range(fields.size() + 1))), + function.getElementType(null), + UserDefinedFunctionUtils.buildRelDataType( +builder.getTypeFactory, +logicalType, +fieldNames, +fieldNames.indices.toArray), + null) +builder.push(scan) + +// join Review comment: join => correlate This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297999767 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, GREATER_THAN_OR_EQUAL, IF} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Intersect +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) + *FROM ( + * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count + * FROM ( + * SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt + * FROM ( + * SELECT c1, true as vcol1, null as vcol2 FROM ut1 + * UNION ALL + * SELECT c1, null as vcol1, true as vcol2 FROM ut2 + * ) AS union_all + * GROUP BY c1 + * HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1 + * ) + * ) + * ) + * }}} + */ +class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase( + classOf[Intersect], "RewriteIntersectAll") { + + override def matches(call: RelOptRuleCall): Boolean = { +val intersect: Intersect = call.rel(0) +!intersect.isDistinct + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val intersect: Intersect = call.rel(0) Review comment: `Intersect` and `Minus` may have more than two inputs, and that case could not be supported now This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] knaufk commented on issue #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough
knaufk commented on issue #8903: [FLINK-12747][docs] Getting Started - Table API Example Walkthrough URL: https://github.com/apache/flink/pull/8903#issuecomment-506153339 Thanks a lot @sjwiesman for the contribution! A few general thoughts: * I really like that it's not the typical generic "wordcount"/"windowed wordcount"/etc. * I would be in favor of simply adding this as an example to `flink-examples-table` without a maven archtype. Firstly, we already have quickstart modules to create program skeletons. Adding an additional option for the walkthroughs adds extra burden in terms of ongoing maintenance and might confuse users. Secondly, in my opinion a "code walkthrough" should focus on explainoing the code of an example so that I can easily follow it. I don't necessarily expect the utilities to bootstrap a new program here. (We should have a separate dedicated subsection in the "Getting Started" section for the quickstarts/project setup.) * Personally I think, the example for a code walkthrough should be no-magic and I should be able to understand as much as possible as a new user. Here this means to go without the custom `TableSource`, which is not easy to understand and it is not clear why this can be a streaming and batch source at the same time etc.. Alternatively, one could simply read from a file of transactions (batch case) or read from a message queue (streaming case; maybe provide a minimal data generator). This would also go well with the future vision of "simply replace the source and your job turns into a streaming program", although we are not there yet. * Do we also want to add SQL to this example (probably as a section in the end implementing the same business logic)? Currently, we don't plan to have dedicated "SQL Walkthrough". Do we need an SQL walkthrough at all? What do you think? What do others think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297975145 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, GREATER_THAN_OR_EQUAL, IF} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Intersect +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) + *FROM ( + * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count + * FROM ( + * SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt + * FROM ( + * SELECT c1, true as vcol1, null as vcol2 FROM ut1 + * UNION ALL + * SELECT c1, null as vcol1, true as vcol2 FROM ut2 + * ) AS union_all + * GROUP BY c1 + * HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1 + * ) + * ) + * ) + * }}} + */ +class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase( + classOf[Intersect], "RewriteIntersectAll") { + + override def matches(call: RelOptRuleCall): Boolean = { +val intersect: Intersect = call.rel(0) +!intersect.isDistinct + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val intersect: Intersect = call.rel(0) +val left = intersect.getInput(0) +val right = intersect.getInput(1) + +val fields = Util.range(intersect.getRowType.getFieldCount) + +val builder1 = call.builder +val boolType = builder1.getTypeFactory.createSqlType(SqlTypeName.BOOLEAN) +val leftWithAddedVirtualCols = builder1 +.push(left) +.project( + builder1.fields(fields) ++ Seq( +builder1.alias(builder1.literal(true), "vcol1"), +builder1.alias(builder1.getRexBuilder.makeNullLiteral(boolType), "vcol2"))) +.build() + +val builder2 = call.builder +val rightWithAddedVirtualCols = builder2 +.push(right) +.project( + builder2.fields(fields) ++ Seq( +builder2.alias(builder2.getRexBuilder.makeNullLiteral(boolType), "vcol1"), +builder2.alias(builder2.literal(true), "vcol2"))) +.build() + +val builder = call.builder Review comment: please add some comments for each step This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner
godfreyhe commented on a change in pull request #8898: [FLINK-12936][table-planner-blink] Support intersect/minus all to blink planner URL: https://github.com/apache/flink/pull/8898#discussion_r297984695 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteIntersectAll.scala ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable.{GREATER_THAN, GREATER_THAN_OR_EQUAL, IF} + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.core.Intersect +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.Util + +import scala.collection.JavaConversions._ + +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) + *FROM ( + * SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count + * FROM ( + * SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt + * FROM ( + * SELECT c1, true as vcol1, null as vcol2 FROM ut1 + * UNION ALL + * SELECT c1, null as vcol1, true as vcol2 FROM ut2 + * ) AS union_all + * GROUP BY c1 + * HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1 + * ) + * ) + * ) + * }}} + */ +class RewriteIntersectAll extends ReplaceSetOpWithJoinRuleBase( + classOf[Intersect], "RewriteIntersectAll") { + + override def matches(call: RelOptRuleCall): Boolean = { +val intersect: Intersect = call.rel(0) +!intersect.isDistinct + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val intersect: Intersect = call.rel(0) +val left = intersect.getInput(0) +val right = intersect.getInput(1) + +val fields = Util.range(intersect.getRowType.getFieldCount) + +val builder1 = call.builder +val boolType = builder1.getTypeFactory.createSqlType(SqlTypeName.BOOLEAN) +val leftWithAddedVirtualCols = builder1 +.push(left) +.project( + builder1.fields(fields) ++ Seq( +builder1.alias(builder1.literal(true), "vcol1"), +builder1.alias(builder1.getRexBuilder.makeNullLiteral(boolType), "vcol2"))) +.build() + +val builder2 = call.builder +val rightWithAddedVirtualCols = builder2 +.push(right) +.project( + builder2.fields(fields) ++ Seq( +builder2.alias(builder2.getRexBuilder.makeNullLiteral(boolType), "vcol1"), +builder2.alias(builder2.literal(true), "vcol2"))) +.build() + +val builder = call.builder +builder +.push(leftWithAddedVirtualCols) +.push(rightWithAddedVirtualCols) +.union(true) +.aggregate( + builder.groupKey(builder.fields(fields)), + builder.count(false, "vcol1_count", builder.field("vcol1")), + builder.count(false, "vcol2_count", builder.field("vcol2"))) +.filter(builder.and( + builder.call( +GREATER_THAN_OR_EQUAL, +builder.field("vcol1_count"), +builder.literal(1)), + builder.call( +GREATER_THAN_OR_EQUAL, +builder.field("vcol2_count"), +builder.literal(1 +.project(Seq(builder.call( + IF, + builder.call(GREATER_THAN, builder.field("vcol1_count"), builder.field("vcol2_count")), + builder.field("vcol2_count"), + builder.field("vcol1_count"))) ++ builder.fields(fields)) + +call.transformTo(replicateRows(builder, intersect.getRowType, fields)) Review comment: please define a variable for `replicateRows(builder, intersect.getRowType, fields)` result, which could make
[GitHub] [flink] wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#discussion_r297994922 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.util.IOUtils; + +import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkState; + +final class FileChannelMemoryMappedBoundedData implements BoundedData { Review comment: It is better to supplement some java doc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-13011) Release the PyFlink into PyPI
[ https://issues.apache.org/jira/browse/FLINK-13011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873799#comment-16873799 ] sunjincheng commented on FLINK-13011: - I appreciate it if you can give some suggestions about how to release the PyFlink into PyPI. :) [~Zentol] > Release the PyFlink into PyPI > - > > Key: FLINK-13011 > URL: https://issues.apache.org/jira/browse/FLINK-13011 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Build System >Affects Versions: 1.9.0 >Reporter: sunjincheng >Priority: Major > > FLINK-12962 adds the ability to build a PyFlink distribution package, but we > have not yet released PyFlink to PyPI. The goal of JIRA is to publish the > PyFlinjk distribution package built by FLINK-12962 to PyPI. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 commented on issue #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
sunjincheng121 commented on issue #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#issuecomment-506150106 I have opened the JIRA(https://issues.apache.org/jira/browse/FLINK-13011) for Release the PyFlink into PyPI. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13011) Release the PyFlink into PyPI
sunjincheng created FLINK-13011: --- Summary: Release the PyFlink into PyPI Key: FLINK-13011 URL: https://issues.apache.org/jira/browse/FLINK-13011 Project: Flink Issue Type: Sub-task Components: API / Python, Build System Affects Versions: 1.9.0 Reporter: sunjincheng FLINK-12962 adds the ability to build a PyFlink distribution package, but we have not yet released PyFlink to PyPI. The goal of JIRA is to publish the PyFlinjk distribution package built by FLINK-12962 to PyPI. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297963075 ## File path: flink-python/pyflink/version.py ## @@ -16,4 +16,4 @@ # limitations under the License. -__version__ = "0.1.0" +__version__ = "1.9.dev0" Review comment: It's better to add explanations for the rule of version name. and add the link: https://www.python.org/dev/peps/pep-0440 What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297984293 ## File path: flink-dist/src/main/flink-bin/bin/find-flink-home.sh ## @@ -0,0 +1,28 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +CURRENT_DIR="$( cd "$(dirname "$0")" ; pwd -P )" +FIND_FLINK_HOME_PYTHON_SCRIPT="$CURRENT_DIR/find_flink_home.py" + +if [ ! -f "$FIND_FLINK_HOME_PYTHON_SCRIPT" ]; then +export FLINK_HOME="$( cd "$CURRENT_DIR"/.. ; pwd -P )" +else +PYFLINK_PYTHON="${PYFLINK_PYTHON:-"python"}" +export FLINK_HOME=$($PYFLINK_PYTHON "$FIND_FLINK_HOME_PYTHON_SCRIPT") Review comment: remove `$PYFLINK_PYTHON`, and only using `$FIND_FLINK_HOME_PYTHON_SCRIPT`, due to we already add the `/usr/bin/env python` for `find_flink_home.py`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297983205 ## File path: docs/flinkDev/building.md ## @@ -58,6 +58,17 @@ mvn clean install -DskipTests -Dfast The default build adds a Flink-specific JAR for Hadoop 2, to allow using Flink with HDFS and YARN. +## Build PyFlink + +If you want to build a PyFlink package that can be used for pip installation, you need to build Flink jars first, as described in [Build Flink](##Build Flink). +Then go to the root directory of flink source code and run this command to build a sdist package: + +{% highlight bash %} +cd flink-python; python setup.py sdist +{% endhighlight %} + +The sdist package will be found under `./flink-python/dist/`. Review comment: It's better to add the desc for how to install the `pyflink`, what do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297963181 ## File path: flink-python/pyflink/shell.py ## @@ -131,8 +139,8 @@ * t.select("a + 1, b, c").insert_into("stream_sink") * * st_env.exec_env().execute() - ''' -print(welcome_msg) +''' +utf8_out.write(welcome_msg) Review comment: Good catch! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed.
sunjincheng121 commented on a change in pull request #8863: [FLINK-12962][python] Allows pyflink to be pip installed. URL: https://github.com/apache/flink/pull/8863#discussion_r297971249 ## File path: flink-dist/src/main/flink-bin/bin/config.sh ## @@ -296,7 +296,10 @@ bin=`dirname "$target"` SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P` # Define the main directory of the flink installation -FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"` +# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), use the _PYFLINK_HOME Review comment: `use the _PYFLINK_HOME` -> `then do not need to set the FLINK_HOME here.` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-4399) Add support for oversized messages
[ https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873791#comment-16873791 ] Biao Liu commented on FLINK-4399: - Recently at-least two users met the oversized message issue. I'll try to resolve this. Since this general resolution is a bit complicated, I will attach a design doc to have a discussion first in next days. > Add support for oversized messages > -- > > Key: FLINK-4399 > URL: https://issues.apache.org/jira/browse/FLINK-4399 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination > Environment: FLIP-6 feature branch >Reporter: Stephan Ewen >Assignee: Biao Liu >Priority: Major > Labels: flip-6 > > Currently, messages larger than the maximum Akka Framesize cause an error > when being transported. We should add a way to pass messages that are larger > than the Framesize, as may happen for: > - {{collect()}} calls that collect large data sets (via accumulators) > - Job submissions and operator deployments where the functions closures are > large (for example because it contains large pre-loaded data) > - Function restore in cases where restored state is larger than > checkpointed state (union state) > I suggest to use the {{BlobManager}} to transfer large payload. > - On the sender side, oversized messages are stored under a transient blob > (which is deleted after first retrieval, or after a certain number of minutes) > - The sender sends a "pointer to blob message" instead. > - The receiver grabs the message from the blob upon receiving the pointer > message > The RPC Service should be optionally initializable with a "large message > handler" which is internally the {{BlobManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13010) Refactor the process of SchedulerNG#requestPartitionState
zhijiang created FLINK-13010: Summary: Refactor the process of SchedulerNG#requestPartitionState Key: FLINK-13010 URL: https://issues.apache.org/jira/browse/FLINK-13010 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently `requestPartitionState` is mainly used for querying partition state when the consumer receives `PartitionNotFoundException` during requesting partition. Actually we do not have the concept of partition state atm, and ` requestPartitionState` would return the corresponding producer's state as as result, so it exists a contradiction here. My suggestion is refactoring the method as `requestPartitionProducerState` and we do not need to pass `IntermediateDataSetID` and `ResultPartitionID` arguments for finding the corresponding execution attempt. We could only pass the `ExecutionAttemptID` in method then the corresponding execution attempt could be easily found from the mapping in `ExecutionGraph`. To do so, we could further remove ` IntermediateDataSetID` from `SingleInputGate` and might replace `IntermediateDataSetID` by `InputGateID` in `InputGateDeploymentDescriptor`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13009) YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots thro
[ https://issues.apache.org/jira/browse/FLINK-13009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-13009: -- Description: The test {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}} throws NPE when Travis build. >From the code of hadoop-2.8.3[1], seems the {{rmContext}} is null {code:java} // Only add in the running containers if this is the active attempt. RMAppAttempt currentAttempt = rmContext.getRMApps() .get(attemptId.getApplicationId()).getCurrentAppAttempt(); {code} log [https://api.travis-ci.org/v3/job/550689578/log.txt] [1] https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128 was: The test {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}} throws NPE when Travis build. >From the code of hadoop-2.8.3, seems the {{rmContext}} is null {code:java} // code from https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128 // Only add in the running containers if this is the active attempt. RMAppAttempt currentAttempt = rmContext.getRMApps() .get(attemptId.getApplicationId()).getCurrentAppAttempt(); {code} log [https://api.travis-ci.org/v3/job/550689578/log.txt] > YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots > throws NPE on Travis > - > > Key: FLINK-13009 > URL: https://issues.apache.org/jira/browse/FLINK-13009 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: Congxian Qiu(klion26) >Priority: Major > > The test > {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}} > throws NPE when Travis build. > From the code of hadoop-2.8.3[1], seems the {{rmContext}} is null > {code:java} > // Only add in the running containers if this is the active attempt. > RMAppAttempt currentAttempt = rmContext.getRMApps() > .get(attemptId.getApplicationId()).getCurrentAppAttempt(); > {code} > > log [https://api.travis-ci.org/v3/job/550689578/log.txt] > [1] > https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13009) YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots thro
[ https://issues.apache.org/jira/browse/FLINK-13009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-13009: -- Description: The test {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}} throws NPE on Travis. >From the code of hadoop-2.8.3[1], seems the {{rmContext}} is null {code:java} // Only add in the running containers if this is the active attempt. RMAppAttempt currentAttempt = rmContext.getRMApps() .get(attemptId.getApplicationId()).getCurrentAppAttempt(); {code} log [https://api.travis-ci.org/v3/job/550689578/log.txt] [1] [https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128] was: The test {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}} throws NPE when Travis build. >From the code of hadoop-2.8.3[1], seems the {{rmContext}} is null {code:java} // Only add in the running containers if this is the active attempt. RMAppAttempt currentAttempt = rmContext.getRMApps() .get(attemptId.getApplicationId()).getCurrentAppAttempt(); {code} log [https://api.travis-ci.org/v3/job/550689578/log.txt] [1] https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128 > YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots > throws NPE on Travis > - > > Key: FLINK-13009 > URL: https://issues.apache.org/jira/browse/FLINK-13009 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: Congxian Qiu(klion26) >Priority: Major > > The test > {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}} > throws NPE on Travis. > From the code of hadoop-2.8.3[1], seems the {{rmContext}} is null > {code:java} > // Only add in the running containers if this is the active attempt. > RMAppAttempt currentAttempt = rmContext.getRMApps() > .get(attemptId.getApplicationId()).getCurrentAppAttempt(); > {code} > > log [https://api.travis-ci.org/v3/job/550689578/log.txt] > [1] > [https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-4399) Add support for oversized messages
[ https://issues.apache.org/jira/browse/FLINK-4399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Biao Liu reassigned FLINK-4399: --- Assignee: Biao Liu > Add support for oversized messages > -- > > Key: FLINK-4399 > URL: https://issues.apache.org/jira/browse/FLINK-4399 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination > Environment: FLIP-6 feature branch >Reporter: Stephan Ewen >Assignee: Biao Liu >Priority: Major > Labels: flip-6 > > Currently, messages larger than the maximum Akka Framesize cause an error > when being transported. We should add a way to pass messages that are larger > than the Framesize, as may happen for: > - {{collect()}} calls that collect large data sets (via accumulators) > - Job submissions and operator deployments where the functions closures are > large (for example because it contains large pre-loaded data) > - Function restore in cases where restored state is larger than > checkpointed state (union state) > I suggest to use the {{BlobManager}} to transfer large payload. > - On the sender side, oversized messages are stored under a transient blob > (which is deleted after first retrieval, or after a certain number of minutes) > - The sender sends a "pointer to blob message" instead. > - The receiver grabs the message from the blob upon receiving the pointer > message > The RPC Service should be optionally initializable with a "large message > handler" which is internally the {{BlobManager}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8906: [FLINK-13008]fix the findbugs warning in AggregationsFunctio.scala
flinkbot commented on issue #8906: [FLINK-13008]fix the findbugs warning in AggregationsFunctio.scala URL: https://github.com/apache/flink/pull/8906#issuecomment-506144046 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13008) fix the findbugs warning in AggregationsFunction
[ https://issues.apache.org/jira/browse/FLINK-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13008: --- Labels: pull-request-available (was: ) > fix the findbugs warning in AggregationsFunction > > > Key: FLINK-13008 > URL: https://issues.apache.org/jira/browse/FLINK-13008 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: wangpeibin >Assignee: wangpeibin >Priority: Minor > Labels: pull-request-available > > The goal is to fix the findbugs warning in > *[GeneratedAggregations|[https://github.com/apache/flink/blob/e5cadf69d176181af7d097928bf2a6cade1b6c76/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala]]* > > with remove function in line 152, It return the > UnsupportedOperationException() rather than throw it which cause a scariest > findbugs warning > > {code:java} > override def remove(): Unit = new java.lang.UnsupportedOperationException > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] klion26 commented on issue #8895: [FLINK-12536][Runtime/Network]Make BufferOrEventSequence#getNext non blocking
klion26 commented on issue #8895: [FLINK-12536][Runtime/Network]Make BufferOrEventSequence#getNext non blocking URL: https://github.com/apache/flink/pull/8895#issuecomment-506143844 Travis build failed seems irrelevant, have filed a [issue](https://issues.apache.org/jira/browse/FLINK-13009) to track it. ``` Test testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots(org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase) failed with: java.lang.NullPointerException: java.lang.NullPointerException at org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics.getAggregateAppResourceUsage(RMAppAttemptMetrics.java:128) at org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl.getApplicationResourceUsageReport(RMAppAttemptImpl.java:900) at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl.createAndGetApplicationReport(RMAppImpl.java:660) at org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplications(ClientRMService.java:930) at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplications(ApplicationClientProtocolPBServiceImpl.java:273) at org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:507) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:847) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:790) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2486) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wangpeibin713 opened a new pull request #8906: [FLINK-13008]fix the findbugs warning in AggregationsFunctio.scala
wangpeibin713 opened a new pull request #8906: [FLINK-13008]fix the findbugs warning in AggregationsFunctio.scala URL: https://github.com/apache/flink/pull/8906 ## What is the purpose of the change - The goal is to fix the findbugs warning in AggregationsFunctio.scala https://issues.apache.org/jira/browse/FLINK-13008 ## Brief change log - *throw the unsupportedOperationException rather than return it* ## Verifying this change * This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13009) YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots thro
[ https://issues.apache.org/jira/browse/FLINK-13009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu(klion26) updated FLINK-13009: -- Summary: YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots throws NPE on Travis (was: YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots) > YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots > throws NPE on Travis > - > > Key: FLINK-13009 > URL: https://issues.apache.org/jira/browse/FLINK-13009 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: Congxian Qiu(klion26) >Priority: Major > > The test > {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}} > throws NPE when Travis build. > From the code of hadoop-2.8.3, seems the {{rmContext}} is null > {code:java} > // code from > https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128 > // Only add in the running containers if this is the active attempt. > RMAppAttempt currentAttempt = rmContext.getRMApps() > .get(attemptId.getApplicationId()).getCurrentAppAttempt(); > {code} > > log [https://api.travis-ci.org/v3/job/550689578/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13009) YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots
Congxian Qiu(klion26) created FLINK-13009: - Summary: YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots Key: FLINK-13009 URL: https://issues.apache.org/jira/browse/FLINK-13009 Project: Flink Issue Type: Test Components: Tests Reporter: Congxian Qiu(klion26) The test {{YARNSessionCapacitySchedulerITCase#testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterfaceAndDynamicPropertiesAndYarnApplicationNameAndTaskManagerSlots}} throws NPE when Travis build. >From the code of hadoop-2.8.3, seems the {{rmContext}} is null {code:java} // code from https://github.com/apache/hadoop/blob/b3fe56402d908019d99af1f1f4fc65cb1d1436a2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java#L128 // Only add in the running containers if this is the active attempt. RMAppAttempt currentAttempt = rmContext.getRMApps() .get(attemptId.getApplicationId()).getCurrentAppAttempt(); {code} log [https://api.travis-ci.org/v3/job/550689578/log.txt] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on issue #8530: [FLINK-12611][table-planner-blink] Make time indicator nullable in blink
wuchong commented on issue #8530: [FLINK-12611][table-planner-blink] Make time indicator nullable in blink URL: https://github.com/apache/flink/pull/8530#issuecomment-506143355 This looks good to me. +1 to merge. I will rebase this and merge it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
wsry commented on a change in pull request #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#discussion_r297963987 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; + +/** + * Putting and getting of a sequence of buffers to/from a FileChannel or a ByteBuffer. + * This class handles the headers, length encoding, memory slicing. + * + * The encoding is the same across FileChannel and ByteBuffer, so this class can + * write to a file and read from the byte buffer that results from mapping this file to memory. + */ +final class BufferReaderWriterUtil { + + static final int HEADER_LENGTH = 8; + + static final int HEADER_VALUE_IS_BUFFER = 0; + + static final int HEADER_VALUE_IS_EVENT = 1; + + // + // ByteBuffer read / write + // + + static boolean writeBuffer(Buffer buffer, ByteBuffer memory) { + final int bufferSize = buffer.getSize(); + + if (memory.remaining() < bufferSize + HEADER_LENGTH) { + return false; + } + + memory.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT); + memory.putInt(bufferSize); + memory.put(buffer.getNioBufferReadable()); + return true; + } + + @Nullable + static Buffer sliceNextBuffer(ByteBuffer memory) { + final int remaining = memory.remaining(); + + // we only check the correct case where data is exhausted + // all other cases can only occur if our write logic is wrong and will already throw + // buffer underflow exceptions which will cause the read to fail. + if (remaining == 0) { + return null; + } + + final int header = memory.getInt(); + final int size = memory.getInt(); + + memory.limit(memory.position() + size); + ByteBuffer buf = memory.slice(); + memory.position(memory.limit()); + memory.limit(memory.capacity()); + + MemorySegment memorySegment = MemorySegmentFactory.wrapOffHeapMemory(buf); + + return bufferFromMemorySegment( + memorySegment, + FreeingBufferRecycler.INSTANCE, + size, + header == HEADER_VALUE_IS_EVENT); + } + + // + // ByteChannel read / write + // + + static long writeToByteChannel( + FileChannel channel, + Buffer buffer, + ByteBuffer[] arrayWithHeaderBuffer) throws IOException { + + final ByteBuffer headerBuffer = arrayWithHeaderBuffer[0]; + headerBuffer.clear(); + headerBuffer.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT); + headerBuffer.putInt(buffer.getSize()); + headerBuffer.flip();
[jira] [Commented] (FLINK-12848) Method equals() in RowTypeInfo should consider fieldsNames
[ https://issues.apache.org/jira/browse/FLINK-12848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873786#comment-16873786 ] Jark Wu commented on FLINK-12848: - Hi [~aloyszhang] [~yanghua] [~enricoc], thanks for reporting this. However, I don't think it's a safe way to compare field names in RowTypeInfo#equals. As [~enricoc] mentioned, we explicitly explained RowTypeInfo#equals should not check field names in FLINK-9444. Because RowTypeInfo is an information describes the structure of {{org.apache.flink.types.Row}}, there is no field names in Row. And in runtime (e.g. serialize/deserialize), the field names also don't matter. We heavily used TypeInformation (including #equals) in SQL before. This led to a lot of problems. That's why [~tiwalter] introduced a well-defined type system in SQL recently. This work is still ongoing, and we will replace the TypeInformation with the new type system totally in SQL/Table. At that time, the problems you encountered should be fixed. [~aloyszhang] could you attach the implementation of {{SimpleProcessionTimeSource}}, without it I can't reproduce the exception you mentioned. We can try it again to see whether it has been solved. > Method equals() in RowTypeInfo should consider fieldsNames > -- > > Key: FLINK-12848 > URL: https://issues.apache.org/jira/browse/FLINK-12848 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.7.2 >Reporter: aloyszhang >Assignee: aloyszhang >Priority: Major > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > Since the `RowTypeInfo#equals()` does not consider the fieldNames , when > process data with RowTypeInfo type there may comes an error of the field > name. > {code:java} > String [] fields = new String []{"first", "second"}; > TypeInformation[] types = new TypeInformation[]{ > Types.ROW_NAMED(new String[]{"first001"}, Types.INT), > Types.ROW_NAMED(new String[]{"second002"}, Types.INT) }; > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment env = > StreamTableEnvironment.getTableEnvironment(execEnv); > SimpleProcessionTimeSource streamTableSource = new > SimpleProcessionTimeSource(fields, types); > env.registerTableSource("testSource", streamTableSource); > Table sourceTable = env.scan("testSource"); > System.out.println("Source table schema : "); > sourceTable.printSchema(); > {code} > The table shcema will be > {code:java} > Source table schema : > root > |-- first: Row(first001: Integer) > |-- second: Row(first001: Integer) > |-- timestamp: TimeIndicatorTypeInfo(proctime) > {code} > the second field has the same name with the first field. > So, we should consider the fieldnames in RowTypeInfo#equals() > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
klion26 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#issuecomment-506141650 Hi, @yumengz5 what do you mean "task will block all the time?", would you mind explain it more? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aloyszhang commented on a change in pull request #8902: [hotfix][docs] fix wrong link in streamfile_sink
aloyszhang commented on a change in pull request #8902: [hotfix][docs] fix wrong link in streamfile_sink URL: https://github.com/apache/flink/pull/8902#discussion_r297962271 ## File path: docs/dev/connectors/streamfile_sink.md ## @@ -24,7 +24,7 @@ under the License. --> This connector provides a Sink that writes partitioned files to filesystems -supported by the [Flink `FileSystem` abstraction]({{ site.baseurl}}/ops/filesystems.html). +supported by the [Flink `FileSystem` abstraction]({{ site.baseurl}}/ops/filesystems). Review comment: Hi zentol, I have been working on translating this doc to Chinese. So, I will fix the chinese version in[ FLINK-12944](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-12944?filter=allissues) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] aloyszhang commented on a change in pull request #8902: [hotfix][docs] fix wrong link in streamfile_sink
aloyszhang commented on a change in pull request #8902: [hotfix][docs] fix wrong link in streamfile_sink URL: https://github.com/apache/flink/pull/8902#discussion_r297962271 ## File path: docs/dev/connectors/streamfile_sink.md ## @@ -24,7 +24,7 @@ under the License. --> This connector provides a Sink that writes partitioned files to filesystems -supported by the [Flink `FileSystem` abstraction]({{ site.baseurl}}/ops/filesystems.html). +supported by the [Flink `FileSystem` abstraction]({{ site.baseurl}}/ops/filesystems). Review comment: Hi @zentol, I have been working on translating this doc to Chinese. So, I will fix the chinese version in[ FLINK-12944](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-12944?filter=allissues) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13008) fix the findbugs warning in AggregationsFunction
wangpeibin created FLINK-13008: -- Summary: fix the findbugs warning in AggregationsFunction Key: FLINK-13008 URL: https://issues.apache.org/jira/browse/FLINK-13008 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Reporter: wangpeibin Assignee: wangpeibin The goal is to fix the findbugs warning in *[GeneratedAggregations|[https://github.com/apache/flink/blob/e5cadf69d176181af7d097928bf2a6cade1b6c76/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala]]* with remove function in line 152, It return the UnsupportedOperationException() rather than throw it which cause a scariest findbugs warning {code:java} override def remove(): Unit = new java.lang.UnsupportedOperationException {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] yumengz5 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint
yumengz5 commented on issue #8617: [FLINK-12619][StateBackend]Support TERMINATE/SUSPEND Job with Checkpoint URL: https://github.com/apache/flink/pull/8617#issuecomment-506135399 What if the checkpoint is timeout, the task will block all the time? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12609) Align the Python data types with Java
[ https://issues.apache.org/jira/browse/FLINK-12609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12609: --- Labels: pull-request-available (was: ) > Align the Python data types with Java > - > > Key: FLINK-12609 > URL: https://issues.apache.org/jira/browse/FLINK-12609 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > As discussed in > https://github.com/apache/flink/pull/8420#issuecomment-495444623, currently, > there are some data types defined in Java not supported in Python such as > TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, etc. We should > support them in Python once these types have been fully supported in Java. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8847: [FLINK-12609][python] Align the Python data types with Java
asfgit closed pull request #8847: [FLINK-12609][python] Align the Python data types with Java URL: https://github.com/apache/flink/pull/8847 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12609) Align the Python data types with Java
[ https://issues.apache.org/jira/browse/FLINK-12609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-12609. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in master: 0ed394ef0908a88502d198a90f49e04b123a4eda > Align the Python data types with Java > - > > Key: FLINK-12609 > URL: https://issues.apache.org/jira/browse/FLINK-12609 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Fix For: 1.9.0 > > > As discussed in > https://github.com/apache/flink/pull/8420#issuecomment-495444623, currently, > there are some data types defined in Java not supported in Python such as > TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, etc. We should > support them in Python once these types have been fully supported in Java. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13007) Integrate Flink's FunctionCatalog with Catalog APIs
Bowen Li created FLINK-13007: Summary: Integrate Flink's FunctionCatalog with Catalog APIs Key: FLINK-13007 URL: https://issues.apache.org/jira/browse/FLINK-13007 Project: Flink Issue Type: Improvement Components: Table SQL / API, Table SQL / Planner Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Integrate Flink's FunctionCatalog with Catalog APIs so that user defined functions can be persist thru catalog APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12999) Can't generate valid execution plan for "SELECT uuid() FROM VALUES(1,2,3) T(a, b, c)"
[ https://issues.apache.org/jira/browse/FLINK-12999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-12999: - Summary: Can't generate valid execution plan for "SELECT uuid() FROM VALUES(1,2,3) T(a, b, c)" (was: Can't generate valid execution plan for "SELECT uuid() FROM VALUES(1) T(a)") > Can't generate valid execution plan for "SELECT uuid() FROM VALUES(1,2,3) > T(a, b, c)" > - > > Key: FLINK-12999 > URL: https://issues.apache.org/jira/browse/FLINK-12999 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: Zhenghua Gao >Assignee: godfrey he >Priority: Major > > The ERROR message is: > > org.apache.flink.table.api.TableException: Cannot generate a valid execution > plan for the given query: > LogicalSink(fields=[EXPR$0]) > +- LogicalProject(EXPR$0=[UUID()]) > +- LogicalValues(tuples=[[\{ 1, 2, 3 }]]) > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72) > at > org.apache.flink.table.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:82) > at > org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:51) > at > org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:39) > at > org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:39) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:39) > at > org.apache.flink.table.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:65) > at > org.apache.flink.table.api.TableEnvironment.optimize(TableEnvironment.scala:251) > at > org.apache.flink.table.api.TableEnvironment.compileToExecNodePlan(TableEnvironment.scala:200) > at > org.apache.flink.table.api.TableEnvironment.compile(TableEnvironment.scala:184) > at > org.apache.flink.table.api.TableEnvironment.generateStreamGraph(TableEnvironment.scala:155) > at > org.apache.flink.table.api.BatchTableEnvironment.execute(BatchTableEnvironment.scala:93) > at > org.apache.flink.table.api.TableEnvironment.execute(TableEnvironment.scala:136) > at > org.apache.flink.table.runtime.utils.BatchTableEnvUtil$.collect(BatchTableEnvUtil.scala:55) > at > org.apache.flink.table.runtime.utils.TableUtil$.collectSink(TableUtil.scala:60) > at > org.apache.flink.table.runtime.utils.TableUtil$.collect(TableUtil.scala:41) > at > org.apache.flink.table.runtime.utils.BatchTestBase.executeQuery(BatchTestBase.scala:308) > at > org.apache.flink.table.runtime.utils.BatchTestBase.check(BatchTestBase.scala:164) > at > org.apache.flink.table.runtime.utils.BatchTestBase.checkResult(BatchTestBase.scala:103) > at > org.apache.flink.table.runtime.batch.sql.ValuesITCase.test(ValuesITCase.scala:38) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at >
[GitHub] [flink] flinkbot edited a comment on issue #8847: [FLINK-12609][python] Align the Python data types with Java
flinkbot edited a comment on issue #8847: [FLINK-12609][python] Align the Python data types with Java URL: https://github.com/apache/flink/pull/8847#issuecomment-504894006 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @sunjincheng121 [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @sunjincheng121 [committer] * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @sunjincheng121 [committer] * ✅ 5. Overall code [quality] is good. - Approved by @sunjincheng121 [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8847: [FLINK-12609][python] Align the Python data types with Java
sunjincheng121 commented on a change in pull request #8847: [FLINK-12609][python] Align the Python data types with Java URL: https://github.com/apache/flink/pull/8847#discussion_r297952647 ## File path: flink-python/pyflink/table/table_environment.py ## @@ -494,9 +494,11 @@ def verify_obj(obj): raise TypeError( "schema should be RowType, list, tuple or None, but got: %s" % schema) +# verifies the elements against the specified schema +elements = map(verify_obj, elements) # converts python data to sql data elements = [schema.to_sql_type(element) for element in elements] -return self._from_elements(map(verify_obj, elements), schema) +return self._from_elements(elements, schema) Review comment: Good catch! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8905: [FLINK-13006][hive] remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it
flinkbot commented on issue #8905: [FLINK-13006][hive] remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it URL: https://github.com/apache/flink/pull/8905#issuecomment-506129625 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 opened a new pull request #8905: [FLINK-13006][hive] remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it
bowenli86 opened a new pull request #8905: [FLINK-13006][hive] remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it URL: https://github.com/apache/flink/pull/8905 ## What is the purpose of the change This PR removes GenericUDTFReplicateRows from HiveGenericUDTFTest because Hive 1.2.1 doesn't have it and thus the build profile for Hive 1.2.1 would fail. I've added pretty a lot test coverage for HiveGenericUDTF, so removing just this one test should be fine. ## Brief change log - remove GenericUDTFReplicateRows from GenericUDTFTest ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service
bowenli86 commented on issue #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service URL: https://github.com/apache/flink/pull/8889#issuecomment-506129381 cc @xuefuz @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8905: [FLINK-13006][hive] remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it
bowenli86 commented on issue #8905: [FLINK-13006][hive] remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it URL: https://github.com/apache/flink/pull/8905#issuecomment-506129301 cc @xuefuz @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13006) remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it
[ https://issues.apache.org/jira/browse/FLINK-13006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13006: --- Labels: pull-request-available (was: ) > remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 > doesn't have it > --- > > Key: FLINK-13006 > URL: https://issues.apache.org/jira/browse/FLINK-13006 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot edited a comment on issue #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter
flinkbot edited a comment on issue #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter URL: https://github.com/apache/flink/pull/8894#issuecomment-505774524 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❗ 3. Needs [attention] from. - Needs attention by @aljoscha [PMC] * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12953) View logs from Job view in Web Dashboard
[ https://issues.apache.org/jira/browse/FLINK-12953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873759#comment-16873759 ] vinoyang commented on FLINK-12953: -- Hi [~rmetzger] Agree. In our streaming platform, we have strong requirements about listing old log files for viewing. You know, for long-running streaming applications, we usually configure rolling file strategy based on log frameworks(log4j or logback) otherwise, the size of a single log file would be very large. Currently, Flink only allows users to view the latest log files, it's not easy for users to location some problems which have a relationship with old-time. > View logs from Job view in Web Dashboard > > > Key: FLINK-12953 > URL: https://issues.apache.org/jira/browse/FLINK-12953 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Reporter: Chad Dombrova >Priority: Major > > As a (beam) developer I want to be able to print/log information from my > custom transforms, and then monitor that output within the job view of the > Web Dashboard, so that I don't have to go hunting through the combined log in > the Job Manager view. The Job Manager log has way too much in it, spanning > all jobs, including output logged by both flink and user code. > A good example of how this UX should work can be found in Google Dataflow: > - click on a job, and see the logged output for that job > - click on a transform, and see the logged output for just that transform > thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] ifndef-SleePy commented on issue #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter
ifndef-SleePy commented on issue #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter URL: https://github.com/apache/flink/pull/8894#issuecomment-506124650 @flinkbot attention @aljoscha This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13006) remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it
Bowen Li created FLINK-13006: Summary: remove GenericUDTFReplicateRows from GenericUDTFTest because Hive 1.2.1 doesn't have it Key: FLINK-13006 URL: https://issues.apache.org/jira/browse/FLINK-13006 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13004) Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState
[ https://issues.apache.org/jira/browse/FLINK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873756#comment-16873756 ] Jark Wu commented on FLINK-13004: - I think the {{timestamp}} parameter should be primitive long. Thanks for reporting this. Could your elaborate more about why larger than 127L, the result would return false? > Correct the logic of needToCleanupState in > KeyedProcessFunctionWithCleanupState > --- > > Key: FLINK-13004 > URL: https://issues.apache.org/jira/browse/FLINK-13004 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > > Current implementation of needToCleanupState in > KeyedProcessFunctionWithCleanupState actually has potention bug: > {code:java} > protected Boolean needToCleanupState(Long timestamp) throws IOException { >if (stateCleaningEnabled) { > Long cleanupTime = cleanupTimeState.value(); > // check that the triggered timer is the last registered processing > time timer. > return null != cleanupTime && timestamp == cleanupTime; >} else { > return false; >} > } > {code} > Please note that it directly use "==" to judge whether *Long* type timestamp > and cleanupTime equals. However, if that value is larger than 127L, the > result would actually return false instead of wanted true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12771) Support ConnectorCatalogTable in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873755#comment-16873755 ] Bowen Li commented on FLINK-12771: -- Temporarily on hold this ticket and remove it from 1.9.0 > Support ConnectorCatalogTable in HiveCatalog > > > Key: FLINK-12771 > URL: https://issues.apache.org/jira/browse/FLINK-12771 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Currently {{HiveCatalog}} does not support {{ConnectorCatalogTable}}. There's > a major drawback on this when it comes to real use cases, that is when Table > API users set a {{HiveCatalog}} as their default catalog (which is very > likely), they cannot create or use any inline table sources/sinks with their > default catalog any more. It's really inconvenient for Table API users to use > Flink for exploration, experiment, and production. > There are several workaround in this case. E.g. users have to switch their > default catalog, but that misses our original intention of having a default > {{HiveCatalog}}; or users can register their inline source/sinks to Flink's > default catalog which is a in memory catalog, but that not only require users > to type full path of a table but also requires users to be aware of the > Flink's default catalog, default db, and their names. In short, none of the > workaround seems to be reasonable and user friendly. > From another perspective, Hive has the concept of temporary tables that are > stored in memory of Hive metastore client and are removed when client is shut > down. In Flink, {{ConnectorCatalogTable}} can be seen as a type of > session-based temporary table, and {{HiveCatalog}} (potentially any catalog > implementations) can store it in memory. By introducing the concept of temp > table, we could greatly eliminate frictions for users and raise their > experience and productivity. > Thus, we propose adding a simple in memory map for {{ConnectorCatalogTable}} > in {{HiveCatalog}} to allow users create and use inline source/sink when > their default catalog is a {{HiveCatalog}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12771) Support ConnectorCatalogTable in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12771: - Fix Version/s: (was: 1.9.0) > Support ConnectorCatalogTable in HiveCatalog > > > Key: FLINK-12771 > URL: https://issues.apache.org/jira/browse/FLINK-12771 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Currently {{HiveCatalog}} does not support {{ConnectorCatalogTable}}. There's > a major drawback on this when it comes to real use cases, that is when Table > API users set a {{HiveCatalog}} as their default catalog (which is very > likely), they cannot create or use any inline table sources/sinks with their > default catalog any more. It's really inconvenient for Table API users to use > Flink for exploration, experiment, and production. > There are several workaround in this case. E.g. users have to switch their > default catalog, but that misses our original intention of having a default > {{HiveCatalog}}; or users can register their inline source/sinks to Flink's > default catalog which is a in memory catalog, but that not only require users > to type full path of a table but also requires users to be aware of the > Flink's default catalog, default db, and their names. In short, none of the > workaround seems to be reasonable and user friendly. > From another perspective, Hive has the concept of temporary tables that are > stored in memory of Hive metastore client and are removed when client is shut > down. In Flink, {{ConnectorCatalogTable}} can be seen as a type of > session-based temporary table, and {{HiveCatalog}} (potentially any catalog > implementations) can store it in memory. By introducing the concept of temp > table, we could greatly eliminate frictions for users and raise their > experience and productivity. > Thus, we propose adding a simple in memory map for {{ConnectorCatalogTable}} > in {{HiveCatalog}} to allow users create and use inline source/sink when > their default catalog is a {{HiveCatalog}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-13004) Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState
[ https://issues.apache.org/jira/browse/FLINK-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-13004: Component/s: Table SQL / Planner > Correct the logic of needToCleanupState in > KeyedProcessFunctionWithCleanupState > --- > > Key: FLINK-13004 > URL: https://issues.apache.org/jira/browse/FLINK-13004 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > > Current implementation of needToCleanupState in > KeyedProcessFunctionWithCleanupState actually has potention bug: > {code:java} > protected Boolean needToCleanupState(Long timestamp) throws IOException { >if (stateCleaningEnabled) { > Long cleanupTime = cleanupTimeState.value(); > // check that the triggered timer is the last registered processing > time timer. > return null != cleanupTime && timestamp == cleanupTime; >} else { > return false; >} > } > {code} > Please note that it directly use "==" to judge whether *Long* type timestamp > and cleanupTime equals. However, if that value is larger than 127L, the > result would actually return false instead of wanted true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)