[jira] [Commented] (FLINK-3811) Refactor ExecutionEnvironment in TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15257140#comment-15257140 ] ASF GitHub Bot commented on FLINK-3811: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/1930 [FLINK-3811] [tableAPI] Use only Java ExecutionEnvironments in TableEnvironments Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableJavaEnv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1930.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1930 commit 38d06e0922932dc835abaf460d293fa96e51e4fc Author: Fabian HueskeDate: 2016-04-25T16:58:23Z [FLINK-3811] [tableAPI] Use only Java ExecutionEnvironments in TableEnvironments. > Refactor ExecutionEnvironment in TableEnvironment > - > > Key: FLINK-3811 > URL: https://issues.apache.org/jira/browse/FLINK-3811 > Project: Flink > Issue Type: Improvement > Components: Table API >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > Fix For: 1.1.0 > > > Currently, the Scala BatchTableEnvironment has a reference to a Scala > ExecutionEnvironment and the Java BatchTableEnvironment uses the Java > ExecutionEnvironment. The same applies to their streaming counterparts. > This requires special implementations for Java / Scala for instance to create > new data sources. > I propose to refactor the TableEnvironments such that only the Java execution > environments for batch and streaming are used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3811] [tableAPI] Use only Java Executio...
GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/1930 [FLINK-3811] [tableAPI] Use only Java ExecutionEnvironments in TableEnvironments Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableJavaEnv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1930.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1930 commit 38d06e0922932dc835abaf460d293fa96e51e4fc Author: Fabian HueskeDate: 2016-04-25T16:58:23Z [FLINK-3811] [tableAPI] Use only Java ExecutionEnvironments in TableEnvironments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3811) Refactor ExecutionEnvironment in TableEnvironment
Fabian Hueske created FLINK-3811: Summary: Refactor ExecutionEnvironment in TableEnvironment Key: FLINK-3811 URL: https://issues.apache.org/jira/browse/FLINK-3811 Project: Flink Issue Type: Improvement Components: Table API Reporter: Fabian Hueske Assignee: Fabian Hueske Priority: Minor Fix For: 1.1.0 Currently, the Scala BatchTableEnvironment has a reference to a Scala ExecutionEnvironment and the Java BatchTableEnvironment uses the Java ExecutionEnvironment. The same applies to their streaming counterparts. This requires special implementations for Java / Scala for instance to create new data sources. I propose to refactor the TableEnvironments such that only the Java execution environments for batch and streaming are used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3222) Incorrect shift amount in OperatorCheckpointStats#hashCode()
[ https://issues.apache.org/jira/browse/FLINK-3222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3222: -- Description: Here is related code: {code} result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length >>> 32)); {code} subTaskStats.length is an int. The shift amount is greater than 31 bits. was: Here is related code: {code} result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length >>> 32)); {code} subTaskStats.length is an int. The shift amount is greater than 31 bits. > Incorrect shift amount in OperatorCheckpointStats#hashCode() > > > Key: FLINK-3222 > URL: https://issues.apache.org/jira/browse/FLINK-3222 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > Here is related code: > {code} > result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length > >>> 32)); > {code} > subTaskStats.length is an int. > The shift amount is greater than 31 bits. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3733) registeredTypesWithKryoSerializers is not assigned in ExecutionConfig#deserializeUserCode()
[ https://issues.apache.org/jira/browse/FLINK-3733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3733: -- Description: {code} if (serializedRegisteredTypesWithKryoSerializers != null) { registeredTypesWithKryoSerializers = serializedRegisteredTypesWithKryoSerializers.deserializeValue(userCodeClassLoader); } else { registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>(); } {code} When serializedRegisteredTypesWithKryoSerializers is null, registeredTypesWithKryoSerializers is not assigned. was: {code} if (serializedRegisteredTypesWithKryoSerializers != null) { registeredTypesWithKryoSerializers = serializedRegisteredTypesWithKryoSerializers.deserializeValue(userCodeClassLoader); } else { registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>(); } {code} When serializedRegisteredTypesWithKryoSerializers is null, registeredTypesWithKryoSerializers is not assigned. > registeredTypesWithKryoSerializers is not assigned in > ExecutionConfig#deserializeUserCode() > --- > > Key: FLINK-3733 > URL: https://issues.apache.org/jira/browse/FLINK-3733 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > if (serializedRegisteredTypesWithKryoSerializers != null) { > registeredTypesWithKryoSerializers = > serializedRegisteredTypesWithKryoSerializers.deserializeValue(userCodeClassLoader); > } else { > registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>(); > } > {code} > When serializedRegisteredTypesWithKryoSerializers is null, > registeredTypesWithKryoSerializers is not assigned. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3734) Unclosed DataInputView in AbstractAlignedProcessingTimeWindowOperator#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-3734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3734: -- Description: {code} DataInputView in = inputState.getState(getUserCodeClassloader()); final long nextEvaluationTime = in.readLong(); final long nextSlideTime = in.readLong(); AbstractKeyedTimePanespanes = createPanes(keySelector, function); panes.readFromInput(in, keySerializer, stateTypeSerializer); restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime); } {code} DataInputView in is not closed upon return. was: {code} DataInputView in = inputState.getState(getUserCodeClassloader()); final long nextEvaluationTime = in.readLong(); final long nextSlideTime = in.readLong(); AbstractKeyedTimePanes panes = createPanes(keySelector, function); panes.readFromInput(in, keySerializer, stateTypeSerializer); restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime); } {code} DataInputView in is not closed upon return. > Unclosed DataInputView in > AbstractAlignedProcessingTimeWindowOperator#restoreState() > > > Key: FLINK-3734 > URL: https://issues.apache.org/jira/browse/FLINK-3734 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > DataInputView in = inputState.getState(getUserCodeClassloader()); > final long nextEvaluationTime = in.readLong(); > final long nextSlideTime = in.readLong(); > AbstractKeyedTimePanes panes = > createPanes(keySelector, function); > panes.readFromInput(in, keySerializer, stateTypeSerializer); > restoredState = new RestoredState<>(panes, nextEvaluationTime, > nextSlideTime); > } > {code} > DataInputView in is not closed upon return. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Flink 3691 extend avroinputformat to support g...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1920#issuecomment-214463228 One more thought, since I've had this problem before. Since your pull request started with multiple commits the title chosen by GitHub is based on your branch name rather than the commit message. Since the formatting is different the PR does not automatically attached messages to the Jira ticket and the watchers probably haven't noticed your PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3691 extend avroinputformat to support g...
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/1920#issuecomment-214461886 Thanks for your contribution @gna-phetsarath. I see there was a question in FLINK-3691 as to whether Flink already serializes GenericRecords. It looks to me that these are currently processed by `ReflectDatumReader` whereas with this PR would use `GenericDatumReader`. Could you give a short example of data that can't be processed currently but now works with this PR? Is this also more efficient to use `GenericDatumReader`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...
Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1895#issuecomment-214448465 Hi @aljoscha , thanks a lot for the comments. I addressed them. Please let me know what do you think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3691 extend avroinputformat to support g...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1920#discussion_r60952095 --- Diff: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java --- @@ -289,6 +290,119 @@ public void testDeserializeToSpecificType() throws IOException { } } + /** +* Test if the AvroInputFormat is able to properly read data from an Avro +* file as a GenericRecord. +* +* @throws IOException, +* if there is an exception +*/ + @SuppressWarnings("unchecked") + @Test + public void testDeserialisationGenericRecord() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat format = new AvroInputFormat(new Path(testFile.getAbsolutePath()), + GenericRecord.class); + try { + format.configure(parameters); --- End diff -- Should we explicitly set reuse to `true`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3691 extend avroinputformat to support g...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1920#discussion_r60950868 --- Diff: flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java --- @@ -289,6 +290,119 @@ public void testDeserializeToSpecificType() throws IOException { } } + /** +* Test if the AvroInputFormat is able to properly read data from an Avro +* file as a GenericRecord. +* +* @throws IOException, +* if there is an exception +*/ + @SuppressWarnings("unchecked") + @Test + public void testDeserialisationGenericRecord() throws IOException { + Configuration parameters = new Configuration(); + + AvroInputFormat format = new AvroInputFormat(new Path(testFile.getAbsolutePath()), + GenericRecord.class); + try { + format.configure(parameters); + FileInputSplit[] splits = format.createInputSplits(1); + assertEquals(splits.length, 1); + format.open(splits[0]); + + GenericRecord u = format.nextRecord(null); --- End diff -- I think this should be `format.nextRecord()` since `nextRecord(reuse)` requires an object to be passed in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3691 extend avroinputformat to support g...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1920#discussion_r60949545 --- Diff: flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java --- @@ -119,12 +144,18 @@ public E nextRecord(E reuseValue) throws IOException { if (reachedEnd()) { return null; } - - if (!reuseAvroValue) { - reuseValue = InstantiationUtil.instantiate(avroValueType, Object.class); + if (isGenericRecord) { --- End diff -- Can we replace the nested-if with `return reuseAvroValue ? dataFileReader.next(reuseValue) : dataFileReader.next();`, remove the `isGenericRecord` field, and flatten the `GenericDatumReader` / `SpecificDatumReader` / `ReflectDatumReader` if-statement above? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 3691 extend avroinputformat to support g...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1920#discussion_r60948974 --- Diff: flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java --- @@ -51,11 +62,13 @@ private transient FileReader dataFileReader; private transient long end; - + + private boolean isGenericRecord = false; public AvroInputFormat(Path filePath, Class type) { super(filePath); this.avroValueType = type; + this.isGenericRecord = org.apache.avro.generic.GenericRecord.class == avroValueType; --- End diff -- Should this be testing class equality or Class.isAssignableFrom? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60945888 --- Diff: flink-libraries/flink-cep-scala/pom.xml --- @@ -0,0 +1,122 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +org.apache.flink +flink-libraries +1.1-SNAPSHOT +.. + +flink-cep-scala_2.10 +flink-cep-scala +jar + + + +org.apache.flink +flink-cep_2.10 +${project.version} + + +org.apache.flink +flink-streaming-scala_2.10 +${project.version} +provided + + + +org.ow2.asm +asm +${asm.version} + + +org.apache.flink +flink-tests_2.10 +${project.version} +test +test-jar + + +org.apache.flink +flink-test-utils_2.10 +${project.version} +test + + +org.apache.flink +flink-streaming-java_2.10 +${project.version} +test +test-jar + + +org.apache.flink +flink-streaming-scala_2.10 +${project.version} +test +test-jar + + +org.apache.flink +flink-cep_2.10 +${project.version} +test +test-jar + + + + + + + +org.scala-tools +maven-scala-plugin +2.15.2 + + + +compile +testCompile + + + + +src/main/scala +src/test/scala + +-Xms64m +-Xmx1024m + + + --- End diff -- Can we maybe replace this plugin with ``` net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile ``` I gave you the definition for the old Scala plugin. Sorry my bad. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3810) Missing break in ZooKeeperLeaderElectionService#handleStateChange()
Ted Yu created FLINK-3810: - Summary: Missing break in ZooKeeperLeaderElectionService#handleStateChange() Key: FLINK-3810 URL: https://issues.apache.org/jira/browse/FLINK-3810 Project: Flink Issue Type: Bug Reporter: Ted Yu {code} protected void handleStateChange(ConnectionState newState) { switch (newState) { case CONNECTED: LOG.debug("Connected to ZooKeeper quorum. Leader election can start."); case SUSPENDED: LOG.warn("Connection to ZooKeeper suspended. The contender " + leaderContender.getAddress() + "no longer participates in the leader election."); case RECONNECTED: LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted."); case LOST: // Maybe we have to throw an exception here to terminate the JobManager LOG.warn("Connection to ZooKeeper lost. The contender " + leaderContender.getAddress() + "no longer participates in the leader election."); {code} Any of the first 3 states would result in multiple log output. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3809) Missing break in ZooKeeperLeaderRetrievalService#handleStateChange()
Ted Yu created FLINK-3809: - Summary: Missing break in ZooKeeperLeaderRetrievalService#handleStateChange() Key: FLINK-3809 URL: https://issues.apache.org/jira/browse/FLINK-3809 Project: Flink Issue Type: Bug Reporter: Ted Yu {code} protected void handleStateChange(ConnectionState newState) { switch (newState) { case CONNECTED: LOG.debug("Connected to ZooKeeper quorum. Leader retrieval can start."); case SUSPENDED: LOG.warn("Connection to ZooKeeper suspended. Can no longer retrieve the leader from " + "ZooKeeper."); case RECONNECTED: LOG.info("Connection to ZooKeeper was reconnected. Leader retrieval can be restarted."); case LOST: LOG.warn("Connection to ZooKeeper lost. Can no longer retrieve the leader from " + "ZooKeeper."); } {code} Except for LOST state, the other states would lead to multiple logs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60943147 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { --- End diff -- Actually in the addSource() there is this line: boolean isParallel = function instanceof ParallelSourceFunction; whose result is passed in the constructor of the StreamSource. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60942903 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class FileSplitMonitoringFunctionITCase extends StreamingProgramTestBase { + + private static final int NO_OF_FILES = 10; + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 200; + + private File baseDir; + + private org.apache.hadoop.fs.FileSystem hdfs; + private String hdfsURI; + private MiniDFSCluster hdfsCluster; --- End diff -- I would prefer local files since * we would no longer require the hadoop dependency * it probably reduces the test time * you interact very little with the FIleSystem anyway --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60942837 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { --- End diff -- yup you're right, didn't know about the RIchParallelSourceFunction class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1915#discussion_r60942417 --- Diff: flink-test-utils/pom.xml --- @@ -7,9 +7,7 @@ regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - --- End diff -- The license header looks to have been automatically reformatted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60942382 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { --- End diff -- addSource(...) makes no such guarantee, all calls like readFile(..) etc. all go through addSource(...). You could be right about the NonParallelInput though, will check quickly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1915#issuecomment-214425488 Thanks for your contribution @stefanobortoli :-) I think the changes look good and can be merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1915#discussion_r60940770 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java --- @@ -53,41 +59,55 @@ public void testStartupWhenTaskmanagerActorPortIsUsed() { blocker = new ServerSocket(0, 50, localAddress); final int port = blocker.getLocalPort(); - try { - TaskManager.runTaskManager( - localHostName, - ResourceID.generate(), - port, - new Configuration(), + TaskManager.runTaskManager(localHostName, ResourceID.generate(), port, new Configuration(), TaskManager.class); - fail("This should fail with an IOException"); - } - catch (IOException e) { - // expected. validate the error message - assertNotNull(e.getMessage()); - assertTrue(e.getMessage().contains("Address already in use")); + fail("This should fail with an IOException"); + + } catch (IOException e) { + // expected. validate the error messagex + List causes = getExceptionCauses(e, new ArrayList()); + for (Throwable cause : causes) { + if (cause instanceof BindException) { + throw (BindException) cause; + } } - - } - catch (Exception e) { + fail("This should fail with an exception caused by BindException"); + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); - } - finally { + } finally { if (blocker != null) { try { blocker.close(); - } - catch (IOException e) { + } catch (IOException e) { // no need to log here } } } } /** -* Tests that the TaskManager startup fails synchronously when the I/O directories are -* not writable. +* A utility method to analyze the exceptions and collect the clauses +* +* @param e +*the root exception (Throwable) object +* @param causes +*the list of exceptions that caused the root exceptions +* @return +*/ + private List getExceptionCauses(Throwable e, List causes) { --- End diff -- What about factoring this method out into a Utils class so that it can be used by multiple tests. This would avoid code duplication. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1929#issuecomment-214420551 We may want to think about adding a createInputSplits(int minNumSplits, List files) to the FileInputFormat class; as it stands it scans through the entire directory although we could already exclude several files. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60939797 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { --- End diff -- This is used for input formats, AFAIK. The RichSourceFunction is guaranteed to be of parallelism 1. As I understand from the addSource(SourceFunction function, String sourceName, TypeInformation typeInfo) in the StreamExecutionEnvironment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60939583 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. +*/ + public enum WatchType { + REPROCESS_WITH_APPENDED // Reprocesses the whole file when new data is appended. + } + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link WatchType}. */ + private final WatchType watchType; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = true; + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, + WatchType watchType, int readerParallelism, long interval) { + + this(format, path, FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, interval); + } + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, FilePathFilter filter, + WatchType watchType, int readerParallelism, long interval) { + + this.format = Preconditions.checkNotNull(format); + this.path = Preconditions.checkNotNull(path); + + Preconditions.checkArgument(interval >= 100, + "The specified monitoring interval is smaller than the minimum allowed one (100 ms)."); + this.interval = interval; + + this.watchType = watchType; + + this.pathFilter = Preconditions.checkNotNull(filter); + + this.readerParallelism = Math.max(readerParallelism, 1); + this.globalModificationTime = Long.MIN_VALUE; + } + + @Override + @SuppressWarnings("unchecked") + public void open(Configuration parameters) throws Exception { + super.open(parameters); + format.configure(parameters); + } + + /** +* Creates the input splits for the path to be assigned to the downstream tasks. +* Those are going to read their contents for further
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60939625 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. +*/ + public enum WatchType { + REPROCESS_WITH_APPENDED // Reprocesses the whole file when new data is appended. + } + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link WatchType}. */ + private final WatchType watchType; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = true; + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, + WatchType watchType, int readerParallelism, long interval) { + + this(format, path, FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, interval); + } + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, FilePathFilter filter, + WatchType watchType, int readerParallelism, long interval) { + + this.format = Preconditions.checkNotNull(format); + this.path = Preconditions.checkNotNull(path); + + Preconditions.checkArgument(interval >= 100, + "The specified monitoring interval is smaller than the minimum allowed one (100 ms)."); + this.interval = interval; + + this.watchType = watchType; + + this.pathFilter = Preconditions.checkNotNull(filter); + + this.readerParallelism = Math.max(readerParallelism, 1); + this.globalModificationTime = Long.MIN_VALUE; + } + + @Override + @SuppressWarnings("unchecked") + public void open(Configuration parameters) throws Exception { + super.open(parameters); + format.configure(parameters); + } + + /** +* Creates the input splits for the path to be assigned to the downstream tasks. +* Those are going to read their contents for further
[GitHub] flink pull request: [FLINK-3778] [shell] Forward configuration fro...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1906 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-3778) ScalaShellRemoteStreamEnvironment cannot be forwarded a user configuration
[ https://issues.apache.org/jira/browse/FLINK-3778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-3778. Resolution: Fixed Fixed via 7498d3e35a29449270a88a30eb32b7de74887f5b > ScalaShellRemoteStreamEnvironment cannot be forwarded a user configuration > -- > > Key: FLINK-3778 > URL: https://issues.apache.org/jira/browse/FLINK-3778 > Project: Flink > Issue Type: Improvement > Components: Scala Shell >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > The {{ScalaShellRemoteStreamEnvironment}} cannot unlike the > {{ScalaShellRemoteEnvironment}} be configured with a user configuration. This > effectively prohibits an user to connect against an HA cluster. I think it > would be good to be able to specify a user configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60938941 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. +*/ + public enum WatchType { + REPROCESS_WITH_APPENDED // Reprocesses the whole file when new data is appended. + } + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link WatchType}. */ + private final WatchType watchType; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = true; --- End diff -- now this is really nit-picky, but i would prefer this being false by default, and being set to true in open(). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60938712 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task which monitors a user-provided path and assigns splits + * to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { --- End diff -- This function should implement NonParallelInput; it guarantees that it will always run with a parallelism of 1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60937977 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.Path; + +import java.io.Serializable; + +public interface FilePathFilter extends Serializable { --- End diff -- Please add a JavaDoc to this class describing how/where it is used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60937348 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -35,12 +35,9 @@ import java.util.Set; /** - * This is the single (non-parallel) task, that monitors a user-procided path, and assigns splits - * to downstream tasks for further reading and processing, depending on the user-provided {@link FileSplitMonitoringFunction.WatchType}. - * - * This method keeps track of which splits have already being processed by which task, and at which point - * in the file we are currently processing, at the granularity of the split. In addition, it keeps track - * of the last modification time for each file, so that it can detect new data. + * This is the single (non-parallel) task, that monitors a user-provided path, and assigns splits --- End diff -- both commas don't belong in this sentence. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60936805 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class FileSplitMonitoringFunctionITCase extends StreamingProgramTestBase { + + private static final int NO_OF_FILES = 10; + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 200; + + private File baseDir; + + private org.apache.hadoop.fs.FileSystem hdfs; + private String hdfsURI; + private MiniDFSCluster hdfsCluster; --- End diff -- Yes, a local filesystem would do the job. Just wanted to have some tests with HDFS to be sure that it works, as this is closer to a distributed deployment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60936652 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SortITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testOrderByDesc(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .orderBy('_1.desc) + +val expected = "3,Third\n2,Second\n1,First" +val results = t.toDataSet[Row].setParallelism(1).collect() +compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByAsc(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .orderBy('_1.asc) + +val expected = "1,First\n2,Second\n3,Third" +val results = t.toDataSet[Row].setParallelism(1).collect() +compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByMultipleFieldsDifferentDirections(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +env.setParallelism(2) +val t = env.fromElements((1, 3, "Third"), (1, 2, "Fourth"), (1, 4, "Second"), + (2, 1, "Sixth"), (1, 5, "First"), (1, 1, "Fifth")) + .toTable(tEnv).orderBy('_1.asc, '_2.desc) + +val expected = "1,5,First\n1,4,Second\n1,3,Third\n1,2,Fourth\n1,1,Fifth\n2,1,Sixth" +val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + +implicit def rowOrdering = Ordering.by((x : Row) => (x.productElement(0).asInstanceOf[Int], --- End diff -- Not sure about this comment. I think it would(in fact I checked it)... Maybe you missed the next line(81)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3778] [shell] Forward configuration fro...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1906#issuecomment-214406242 Failing test cases are unrelated. Merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60935904 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task, that monitors a user-procided path, and assigns splits + * to downstream tasks for further reading and processing, depending on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + * + * This method keeps track of which splits have already being processed by which task, and at which point + * in the file we are currently processing, at the granularity of the split. In addition, it keeps track + * of the last modification time for each file, so that it can detect new data. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. This can be currently done in 3 ways. +* {@code ONLY_NEW_FILES} which implies that only new files will be processed. +* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole file, as soon as new data is appended to it. +*/ + public enum WatchType { + REPROCESS_WITH_APPENDED // Reprocesses the whole file when new data is appended. + } + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link WatchType}. */ + private final WatchType watchType; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = true; + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, + WatchType watchType, int readerParallelism, long interval) { + + this(format, path, FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, interval); + } + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, FilePathFilter filter, + WatchType watchType, int readerParallelism, long interval) { + + this.format = Preconditions.checkNotNull(format); + this.path = Preconditions.checkNotNull(path); + + Preconditions.checkArgument(interval >= 100, + "The specified monitoring interval is smaller than the minimum allowed one (100 ms)."); + this.interval = interval; + + this.watchType = watchType; + + this.pathFilter = Preconditions.checkNotNull(filter); + +
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60933644 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java --- @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.commons.io.IOUtils; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +public class FileSplitMonitoringFunctionTest { + + private static final int NO_OF_FILES = 10; + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 200; + + private File baseDir; + + private org.apache.hadoop.fs.FileSystem hdfs; + private String hdfsURI; + private MiniDFSCluster hdfsCluster; + + private Set hdPaths = new HashSet<>(); + private Set hdPathNames = new HashSet<>(); + private MaphdPathContents = new HashMap<>(); + + // PREPARING FOR THE TESTS + + @Before + public void createHDFS() { + try { + baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + + org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set. + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + hdfsCluster = builder.build(); + + hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/"; + hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf); + + } catch(Throwable e) { + e.printStackTrace(); + Assert.fail("Test failed " + e.getMessage()); + } + } + + @After + public void destroyHDFS() { + try { + for(org.apache.hadoop.fs.Path file: hdPaths) { + hdfs.delete(file, false); + } + FileUtil.fullyDelete(baseDir); + hdfsCluster.shutdown(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // END OF PREPARATIONS + + // TESTS + + @Test + public void testFileContents() throws IOException { --- End diff -- this test doesn't test anything in regards to the MonitoringFunction, does it? --- If your project is set up for it, you
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60934370 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java --- @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.commons.io.IOUtils; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +public class FileSplitMonitoringFunctionTest { + + private static final int NO_OF_FILES = 10; + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 200; + + private File baseDir; + + private org.apache.hadoop.fs.FileSystem hdfs; + private String hdfsURI; + private MiniDFSCluster hdfsCluster; + + private Set hdPaths = new HashSet<>(); + private Set hdPathNames = new HashSet<>(); + private MaphdPathContents = new HashMap<>(); + + // PREPARING FOR THE TESTS + + @Before + public void createHDFS() { + try { + baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + + org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set. + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + hdfsCluster = builder.build(); + + hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/"; + hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf); + + } catch(Throwable e) { + e.printStackTrace(); + Assert.fail("Test failed " + e.getMessage()); + } + } + + @After + public void destroyHDFS() { + try { + for(org.apache.hadoop.fs.Path file: hdPaths) { + hdfs.delete(file, false); + } + FileUtil.fullyDelete(baseDir); + hdfsCluster.shutdown(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // END OF PREPARATIONS + + // TESTS + + @Test + public void testFileContents() throws IOException { + // validates the output + for (org.apache.hadoop.fs.Path file : hdPaths) { +
[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60931810 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by [[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create + * a [[org.apache.flink.cep.nfa.NFA]]. + * + * {{{ + * Patternpattern = Pattern.begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * }}} + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T: ClassTag, F <: T](jPattern: JPattern[T, F]) { + + private[flink] def getWrappedPattern = jPattern + + /** +* +* @return Name of the pattern operator +*/ + def getName: String = jPattern.getName --- End diff -- In the Scala world one does not write Java bean like getters. Instead one would simply write `def name = jPattern.getName()`. Note that whenever we call java functions from Scala we add parenthesis even though they might not be needed. This is to underline that one is calling a Java function which does not have the notion of purity. If we really wanna make the API identical, meaning that we offer functions like `getName`, `getWindowTIme` in the Scala API as well, then we should add parenthesis to their definitions to make clear that this is purposefully a Java bean like getter definition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60932653 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/DefaultPathFilter.java --- @@ -0,0 +1,7 @@ +package org.apache.flink.streaming.api.functions.source; + +/** + * Created by kkloudas on 4/25/16. --- End diff -- What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1905#issuecomment-214391606 Good work @StefanRRichter. I had only some minor comments left. Once these are addressed, the PR should be ready to be merged :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60932233 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task, that monitors a user-procided path, and assigns splits + * to downstream tasks for further reading and processing, depending on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + * + * This method keeps track of which splits have already being processed by which task, and at which point + * in the file we are currently processing, at the granularity of the split. In addition, it keeps track + * of the last modification time for each file, so that it can detect new data. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. This can be currently done in 3 ways. +* {@code ONLY_NEW_FILES} which implies that only new files will be processed. +* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole file, as soon as new data is appended to it. +*/ + public enum WatchType { + REPROCESS_WITH_APPENDED // Reprocesses the whole file when new data is appended. + } + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link WatchType}. */ + private final WatchType watchType; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = true; + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, + WatchType watchType, int readerParallelism, long interval) { + + this(format, path, FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, interval); + } + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, FilePathFilter filter, + WatchType watchType, int readerParallelism, long interval) { + + this.format = Preconditions.checkNotNull(format); + this.path = Preconditions.checkNotNull(path); + + Preconditions.checkArgument(interval >= 100, + "The specified monitoring interval is smaller than the minimum allowed one (100 ms)."); + this.interval = interval; + + this.watchType = watchType; + + this.pathFilter = Preconditions.checkNotNull(filter); + +
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60932263 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task, that monitors a user-procided path, and assigns splits + * to downstream tasks for further reading and processing, depending on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + * + * This method keeps track of which splits have already being processed by which task, and at which point + * in the file we are currently processing, at the granularity of the split. In addition, it keeps track + * of the last modification time for each file, so that it can detect new data. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. This can be currently done in 3 ways. +* {@code ONLY_NEW_FILES} which implies that only new files will be processed. +* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole file, as soon as new data is appended to it. +*/ + public enum WatchType { + REPROCESS_WITH_APPENDED // Reprocesses the whole file when new data is appended. + } + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link WatchType}. */ + private final WatchType watchType; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = true; + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, + WatchType watchType, int readerParallelism, long interval) { + + this(format, path, FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, interval); + } + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, FilePathFilter filter, + WatchType watchType, int readerParallelism, long interval) { + + this.format = Preconditions.checkNotNull(format); + this.path = Preconditions.checkNotNull(path); + + Preconditions.checkArgument(interval >= 100, + "The specified monitoring interval is smaller than the minimum allowed one (100 ms)."); + this.interval = interval; + + this.watchType = watchType; + + this.pathFilter = Preconditions.checkNotNull(filter); + +
[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60932255 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by [[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create + * a [[org.apache.flink.cep.nfa.NFA]]. + * + * {{{ + * Patternpattern = Pattern.begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * }}} + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T: ClassTag, F <: T](jPattern: JPattern[T, F]) { + + private[flink] def getWrappedPattern = jPattern + + /** +* +* @return Name of the pattern operator +*/ + def getName: String = jPattern.getName --- End diff -- I know this might be a bit confusing since you don't know the convention which is only "stated" implicitly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1926#issuecomment-214388270 Thanks for the PR @dawidwys. I had a few comments. Please let me know if you have any questions. Regarding a follow task, we have several open issue to extend / improve the Table API. For instance, you could add support for outer joins. That should not be too hard, because the DataSet API supports them natively. Alternatively, we will soon have an interface to define external tables and there will be issues to implement the interface for different storage systems (HBase, ...) or formats (Parquet, ...). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60931033 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task, that monitors a user-procided path, and assigns splits + * to downstream tasks for further reading and processing, depending on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + * + * This method keeps track of which splits have already being processed by which task, and at which point --- End diff -- method? This is the class javadoc! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1929#issuecomment-214387476 Can you provide a rough description as to how the FileSourceMonitoringFunction works and how it interacts with the actual formats? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60930454 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/DefaultPathFilter.java --- @@ -0,0 +1,7 @@ +package org.apache.flink.streaming.api.functions.source; + +/** + * Created by kkloudas on 4/25/16. + */ +public class DefaultPathFilter { --- End diff -- this class is never used as far as i can tell. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60930292 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SortITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testOrderByDesc(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .orderBy('_1.desc) + +val expected = "3,Third\n2,Second\n1,First" +val results = t.toDataSet[Row].setParallelism(1).collect() +compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByAsc(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .orderBy('_1.asc) + +val expected = "1,First\n2,Second\n3,Third" +val results = t.toDataSet[Row].setParallelism(1).collect() +compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByMultipleFieldsDifferentDirections(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +env.setParallelism(2) +val t = env.fromElements((1, 3, "Third"), (1, 2, "Fourth"), (1, 4, "Second"), + (2, 1, "Sixth"), (1, 5, "First"), (1, 1, "Fifth")) + .toTable(tEnv).orderBy('_1.asc, '_2.desc) + +val expected = "1,5,First\n1,4,Second\n1,3,Third\n1,2,Fourth\n1,1,Fifth\n2,1,Sixth" +val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + +implicit def rowOrdering = Ordering.by((x : Row) => (x.productElement(0).asInstanceOf[Int], + - x.productElement(1).asInstanceOf[Int])) + +val result = results.sortBy(p => p.min).reduceLeft(_ ++ _) --- End diff -- The general method to check for the globally sorted result looks good. Can you do the check for the other tests in a similar way? Setting the sort operator to `1` does not really test the functionality, IMO. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60929903 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SortITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testOrderByDesc(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .orderBy('_1.desc) + +val expected = "3,Third\n2,Second\n1,First" +val results = t.toDataSet[Row].setParallelism(1).collect() +compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByAsc(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .orderBy('_1.asc) + +val expected = "1,First\n2,Second\n3,Third" +val results = t.toDataSet[Row].setParallelism(1).collect() +compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByMultipleFieldsDifferentDirections(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +env.setParallelism(2) +val t = env.fromElements((1, 3, "Third"), (1, 2, "Fourth"), (1, 4, "Second"), + (2, 1, "Sixth"), (1, 5, "First"), (1, 1, "Fifth")) + .toTable(tEnv).orderBy('_1.asc, '_2.desc) + +val expected = "1,5,First\n1,4,Second\n1,3,Third\n1,2,Fourth\n1,1,Fifth\n2,1,Sixth" +val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + +implicit def rowOrdering = Ordering.by((x : Row) => (x.productElement(0).asInstanceOf[Int], --- End diff -- Can you change the `Ordering` such that is does not rely on the actual data (a value (2, 20, "Seventh") would not be sorted correctly). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60929807 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java --- @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.commons.io.IOUtils; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +public class FileSplitMonitoringFunctionTest { + + private static final int NO_OF_FILES = 10; + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 200; + + private File baseDir; + + private org.apache.hadoop.fs.FileSystem hdfs; + private String hdfsURI; + private MiniDFSCluster hdfsCluster; + + private Set hdPaths = new HashSet<>(); + private Set hdPathNames = new HashSet<>(); + private MaphdPathContents = new HashMap<>(); + + // PREPARING FOR THE TESTS + + @Before + public void createHDFS() { + try { + baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + + org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set. + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + hdfsCluster = builder.build(); + + hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/"; + hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf); + + } catch(Throwable e) { + e.printStackTrace(); + Assert.fail("Test failed " + e.getMessage()); + } + } + + @After + public void destroyHDFS() { + try { + for(org.apache.hadoop.fs.Path file: hdPaths) { + hdfs.delete(file, false); + } + FileUtil.fullyDelete(baseDir); + hdfsCluster.shutdown(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // END OF PREPARATIONS + + // TESTS + + @Test + public void testFileContents() throws IOException { + // validates the output + for (org.apache.hadoop.fs.Path file : hdPaths) { +
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60929665 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java --- @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.source; + +import org.apache.commons.io.IOUtils; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +public class FileSplitMonitoringFunctionTest { + + private static final int NO_OF_FILES = 10; + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 200; + + private File baseDir; + + private org.apache.hadoop.fs.FileSystem hdfs; + private String hdfsURI; + private MiniDFSCluster hdfsCluster; + + private Set hdPaths = new HashSet<>(); + private Set hdPathNames = new HashSet<>(); + private MaphdPathContents = new HashMap<>(); + + // PREPARING FOR THE TESTS + + @Before + public void createHDFS() { + try { + baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + + org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set. + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + hdfsCluster = builder.build(); + + hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/"; + hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf); + + } catch(Throwable e) { + e.printStackTrace(); + Assert.fail("Test failed " + e.getMessage()); + } + } + + @After + public void destroyHDFS() { + try { + for(org.apache.hadoop.fs.Path file: hdPaths) { + hdfs.delete(file, false); + } + FileUtil.fullyDelete(baseDir); + hdfsCluster.shutdown(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // END OF PREPARATIONS + + // TESTS + + @Test + public void testFileContents() throws IOException { + // validates the output + for (org.apache.hadoop.fs.Path file : hdPaths) { +
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60929272 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SortITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testOrderByDesc(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .orderBy('_1.desc) + +val expected = "3,Third\n2,Second\n1,First" +val results = t.toDataSet[Row].setParallelism(1).collect() +compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByAsc(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .orderBy('_1.asc) + +val expected = "1,First\n2,Second\n3,Third" +val results = t.toDataSet[Row].setParallelism(1).collect() +compareOrderedResultAsText(expected, results) + } + + @Test + def testOrderByMultipleFieldsDifferentDirections(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +env.setParallelism(2) +val t = env.fromElements((1, 3, "Third"), (1, 2, "Fourth"), (1, 4, "Second"), + (2, 1, "Sixth"), (1, 5, "First"), (1, 1, "Fifth")) + .toTable(tEnv).orderBy('_1.asc, '_2.desc) + +val expected = "1,5,First\n1,4,Second\n1,3,Third\n1,2,Fourth\n1,1,Fifth\n2,1,Sixth" +val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect() + +implicit def rowOrdering = Ordering.by((x : Row) => (x.productElement(0).asInstanceOf[Int], + - x.productElement(1).asInstanceOf[Int])) + +val result = results.sortBy(p => p.min).reduceLeft(_ ++ _) + +compareOrderedResultAsText(expected, result) + } + + @Test + def testOrderByMultipleFieldsWithSql(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC" + +val t = env.fromElements((1, 1, "First"), (2, 3, "Fourth"), (1, 2, "Second"), + (2, 1, "Third")).toTable(tEnv) +tEnv.registerDataSet("MyTable", t) + +val queryResult = tEnv.sql(sqlQuery) + +val expected = "2,3,Fourth\n2,1,Third\n1,2,Second\n1,1,First" +val results = queryResult.toDataSet[Row].setParallelism(1).collect() +compareOrderedResultAsText(expected, results) + } + + private def compareOrderedResultAsText[T](expected: String, results: Seq[T]) = { --- End diff -- If we always use `toDataSet[Row]` and respect
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60929017 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SortITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testOrderByDesc(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) + .orderBy('_1.desc) + +val expected = "3,Third\n2,Second\n1,First" +val results = t.toDataSet[Row].setParallelism(1).collect() --- End diff -- I think you change the parallelism of the sort operator to `1´ with this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60928894 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class FileSplitMonitoringFunctionITCase extends StreamingProgramTestBase { + + private static final int NO_OF_FILES = 10; + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 200; + + private File baseDir; + + private org.apache.hadoop.fs.FileSystem hdfs; + private String hdfsURI; + private MiniDFSCluster hdfsCluster; --- End diff -- is it really necessary to spin up a dfs cluster for this test? it should just as well with local files, no? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60928727 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SortITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testOrderByDesc(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment --- End diff -- Can you set the parallelism of the environment to 4 to ensure that the tests are executed in parallel? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1911#discussion_r60928569 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.regions.Regions; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType; +import org.apache.flink.streaming.connectors.kinesis.config.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Properties; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is a parallel streaming data source that pulls data from multiple AWS Kinesis streams + * within the same AWS service region. Each instance of the consumer is responsible for fetching data records from + * one or more Kinesis shards. + * + * To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST: + * + * + * + * + * NOTE: The current implementation does not correctly handle resharding of AWS Kinesis streams. + * NOTE: Since Kinesis and Kafka share many common abstractions, the implementation is heavily based on + * the Flink Kafka Consumer. + * + * @param the type of data emitted + */ +public class FlinkKinesisConsumer extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedAsynchronously>, ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // + // Consumer properties + // + + /** The complete list of shards */ + private final List shards; + + /** Properties to
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60928598 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SortITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testOrderByDesc(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val t = env.fromElements((1, "First"), (3, "Third"), (2, "Second")).toTable(tEnv) --- End diff -- I would use more than three records to test the sorting. Can you use the `CollectionDataSets.get3TupleDataSet()` here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60928614 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task, that monitors a user-procided path, and assigns splits + * to downstream tasks for further reading and processing, depending on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + * + * This method keeps track of which splits have already being processed by which task, and at which point + * in the file we are currently processing, at the granularity of the split. In addition, it keeps track + * of the last modification time for each file, so that it can detect new data. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. This can be currently done in 3 ways. +* {@code ONLY_NEW_FILES} which implies that only new files will be processed. +* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole file, as soon as new data is appended to it. +*/ + public enum WatchType { --- End diff -- where is ONLY_NEW_FILES ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60928509 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task, that monitors a user-procided path, and assigns splits + * to downstream tasks for further reading and processing, depending on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + * + * This method keeps track of which splits have already being processed by which task, and at which point + * in the file we are currently processing, at the granularity of the split. In addition, it keeps track + * of the last modification time for each file, so that it can detect new data. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. This can be currently done in 3 ways. --- End diff -- I would remove the ```This can be currently done in 3 ways.``` part. It doesn't really add anything to the description, and is likely to become out-dated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60928251 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.Path; + +import java.io.Serializable; + +public interface FilePathFilter extends Serializable { + + /** +* @return {@code true} if the {@code filePath} given is to be +* ignored when processing a directory, e.g. +* +* {@code +* +* public boolean filterPaths(Path filePath) { +* return filePath.getName().startsWith(".") || filePath.getName().contains("_COPYING_"); +* } +* } +* */ + boolean filterPaths(Path filePath); + + + /** +* The default file path filtering method and is used +* if no other such function is provided. This filter leaves out +* files starting with ".", "_", and "_COPYING_". +* */ + public class DefaultFilter implements FilePathFilter { + + private static DefaultFilter instance = null; + + DefaultFilter() { + --- End diff -- unnecessary empty line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60928155 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.Path; + +import java.io.Serializable; + +public interface FilePathFilter extends Serializable { + + /** +* @return {@code true} if the {@code filePath} given is to be +* ignored when processing a directory, e.g. +* +* {@code +* +* public boolean filterPaths(Path filePath) { +* return filePath.getName().startsWith(".") || filePath.getName().contains("_COPYING_"); +* } +* } +* */ + boolean filterPaths(Path filePath); --- End diff -- IMO this method should be called ```filterPath``` since it only receives a single path. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60928227 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.core.fs.Path; + +import java.io.Serializable; + +public interface FilePathFilter extends Serializable { + + /** +* @return {@code true} if the {@code filePath} given is to be +* ignored when processing a directory, e.g. +* +* {@code +* +* public boolean filterPaths(Path filePath) { +* return filePath.getName().startsWith(".") || filePath.getName().contains("_COPYING_"); +* } +* } +* */ + boolean filterPaths(Path filePath); + + + /** +* The default file path filtering method and is used +* if no other such function is provided. This filter leaves out +* files starting with ".", "_", and "_COPYING_". +* */ + public class DefaultFilter implements FilePathFilter { + + private static DefaultFilter instance = null; + + DefaultFilter() { + + } + + public static DefaultFilter getInstance() { + if(instance == null) { --- End diff -- missing space after if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60928005 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/DefaultPathFilter.java --- @@ -0,0 +1,7 @@ +package org.apache.flink.streaming.api.functions.source; + +/** + * Created by kkloudas on 4/25/16. --- End diff -- did you meant to include this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60928086 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelCollation, RelNode, SingleRel} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.operators.{DataSource, Operator, PartitionOperator} +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.typeutils.TypeConverter._ + +import scala.collection.JavaConverters._ + +class DataSetSort( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inp: RelNode, +collations: RelCollation, +rowType: RelDataType) + extends SingleRel(cluster, traitSet, inp) + with DataSetRel{ + + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={ +new DataSetSort( + cluster, + traitSet, + inputs.get(0), + collations, + rowType +) + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = { --- End diff -- We are ignoring the `expectedType`. The convention here is to return a `DataSet` of that type. If the input type (`inputDS.getType()`) is not equal to the expected type, we need to add a Map function after the sort, which translates the records into the expected type. See `DataSetSource` for an example of how to do that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60927886 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala --- @@ -458,7 +458,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * every 100 milliseconds. * */ - def readFileStream(StreamPath: String, intervalMillis: Long = 100, --- End diff -- unrelated change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Refactoring the File Monitoring Source.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60927783 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task, that monitors a user-procided path, and assigns splits + * to downstream tasks for further reading and processing, depending on the user-provided {@link FileSplitMonitoringFunction.WatchType}. + * + * This method keeps track of which splits have already being processed by which task, and at which point + * in the file we are currently processing, at the granularity of the split. In addition, it keeps track + * of the last modification time for each file, so that it can detect new data. + */ +public class FileSplitMonitoringFunction + extends RichSourceFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FileSplitMonitoringFunction.class); + + /** +* Specifies when computation will be triggered. This can be currently done in 3 ways. +* {@code ONLY_NEW_FILES} which implies that only new files will be processed. +* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole file, as soon as new data is appended to it. +*/ + public enum WatchType { + REPROCESS_WITH_APPENDED // Reprocesses the whole file when new data is appended. + } + + /** The path to monitor. */ + private final String path; + + /** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */ + private final int readerParallelism; + + /** The {@link FileInputFormat} to be read. */ + private FileInputFormat format; + + /** How often to monitor the state of the directory for new data. */ + private final long interval; + + /** Which new data to process (see {@link WatchType}. */ + private final WatchType watchType; + + private long globalModificationTime; + + private FilePathFilter pathFilter; + + private volatile boolean isRunning = true; + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, + WatchType watchType, int readerParallelism, long interval) { + + this(format, path, FilePathFilter.DefaultFilter.getInstance(), watchType, readerParallelism, interval); + } + + public FileSplitMonitoringFunction( + FileInputFormat format, String path, FilePathFilter filter, + WatchType watchType, int readerParallelism, long interval) { + + this.format = Preconditions.checkNotNull(format); + this.path = Preconditions.checkNotNull(path); + + Preconditions.checkArgument(interval >= 100, + "The specified monitoring interval is smaller than the minimum allowed one (100 ms)."); + this.interval = interval; + + this.watchType = watchType; + + this.pathFilter = Preconditions.checkNotNull(filter); + +
[jira] [Commented] (FLINK-3806) Revert use of DataSet.count() in Gelly
[ https://issues.apache.org/jira/browse/FLINK-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15256433#comment-15256433 ] Aljoscha Krettek commented on FLINK-3806: - Correct, I think the {{count()}}/{{collect()}} methods are somewhat dangerous as long as we recompute them every time. > Revert use of DataSet.count() in Gelly > -- > > Key: FLINK-3806 > URL: https://issues.apache.org/jira/browse/FLINK-3806 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Priority: Critical > > FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The > former returns a {{DataSet}} while the latter executes a job to return a Java > value. > {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and > {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and > {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the > user does not pass the number of vertices as a parameter. > As noted in FLINK-1632, this does make the code simpler but if my > understanding is correct will materialize the Graph twice. The Graph will > need to be reread from input, regenerated, or recomputed by preceding > algorithms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60927218 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelCollation, RelNode, SingleRel} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.operators.{DataSource, Operator, PartitionOperator} +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.typeutils.TypeConverter._ + +import scala.collection.JavaConverters._ + +class DataSetSort( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inp: RelNode, +collations: RelCollation, +rowType: RelDataType) + extends SingleRel(cluster, traitSet, inp) + with DataSetRel{ + + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={ +new DataSetSort( + cluster, + traitSet, + inputs.get(0), + collations, + rowType +) + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = { + +val config = tableEnv.getConfig + +val inputDS = wrapDataSet(inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)) + +val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage +) + +val fieldCollations = collations.getFieldCollations.asScala + .map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) + +val currentParallelism = if (inputDS.getParallelism >= 1) { + inputDS.getParallelism +} else { + inputDS.getExecutionEnvironment.getParallelism +} + +var partitionedDs = if (currentParallelism == 1) { + inputDS.setParallelism(1) +} else { + wrapDataSet(inputDS.partitionByRange(fieldCollations.map(_._1): _*).javaSet +.asInstanceOf[PartitionOperator[Any]] +.withOrders(fieldCollations.map(_._2): _*)) +} + +fieldCollations.foreach { fieldCollation => + partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2) +} + +partitionedDs.javaSet + } + + private def directionToOrder(direction: Direction) = { +direction match { + case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING + case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING +} + + } + + private def wrapDataSet(dataSet: DataSet[Any]) = { --- End diff -- We do not need to wrap Java `DataSet` into Scala DataSet`. The other `DataSetRel` are also only dealing with Java `DataSet`. This also means we do not need to unwrap it with `javaSet`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60926920 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelCollation, RelNode, SingleRel} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.operators.{DataSource, Operator, PartitionOperator} +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.typeutils.TypeConverter._ + +import scala.collection.JavaConverters._ + +class DataSetSort( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inp: RelNode, +collations: RelCollation, +rowType: RelDataType) + extends SingleRel(cluster, traitSet, inp) + with DataSetRel{ + + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={ +new DataSetSort( + cluster, + traitSet, + inputs.get(0), + collations, + rowType +) + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = { + +val config = tableEnv.getConfig + +val inputDS = wrapDataSet(inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)) + +val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage +) + +val fieldCollations = collations.getFieldCollations.asScala + .map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) + +val currentParallelism = if (inputDS.getParallelism >= 1) { --- End diff -- I am not sure about the handling of the parallelism here. I would only check if the default parallelism of the environment is set to 1. If no, we add a range partition operator, if no, we don't. Parallelism is not really handled by the Table API at the moment. This is something that we need to improve in the future but not as part of this PR, IMO. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Watch type proper
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1929#discussion_r60926922 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This is the single (non-parallel) task, that monitors a user-procided path, and assigns splits --- End diff -- typo: procided -> provided --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60926405 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelCollation, RelNode, SingleRel} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.operators.{DataSource, Operator, PartitionOperator} +import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig} +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.typeutils.TypeConverter._ + +import scala.collection.JavaConverters._ + +class DataSetSort( +cluster: RelOptCluster, +traitSet: RelTraitSet, +inp: RelNode, +collations: RelCollation, +rowType: RelDataType) + extends SingleRel(cluster, traitSet, inp) + with DataSetRel{ + + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode ={ +new DataSetSort( + cluster, + traitSet, + inputs.get(0), + collations, + rowType +) + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation[Any]] = None): DataSet[Any] = { + +val config = tableEnv.getConfig + +val inputDS = wrapDataSet(inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)) + +val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage +) + +val fieldCollations = collations.getFieldCollations.asScala + .map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) + +val currentParallelism = if (inputDS.getParallelism >= 1) { + inputDS.getParallelism +} else { + inputDS.getExecutionEnvironment.getParallelism +} + +var partitionedDs = if (currentParallelism == 1) { + inputDS.setParallelism(1) --- End diff -- I don't think we should set the parallelism of the input operator to `1`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Watch type proper
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1929#issuecomment-214373041 What's with the PR title? :confused: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60926030 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -401,6 +400,52 @@ class Table( new Table(relBuilder.build(), tableEnv) } + /** +* Sorts the given [[Table]]. Similar to SQL ORDER BY. The sorting is performed locally across +* all partitions with keys equal to the given fields. +* +* Example: +* +* {{{ +* tab.orderBy('name) --- End diff -- Add `desc` to the example to show how it is used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60925922 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -401,6 +400,52 @@ class Table( new Table(relBuilder.build(), tableEnv) } + /** +* Sorts the given [[Table]]. Similar to SQL ORDER BY. The sorting is performed locally across --- End diff -- I think the following sentence can be confusing > The sorting is performed locally across all partitions with keys equal to the given fields. How about > The resulting Table is sorted globally sorted across all parallel partitions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60925702 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java --- @@ -99,9 +116,25 @@ null, false); + return flatSelect(patternFlatSelectFunction, outTypeInfo); + } + + /** +* Applies a flat select function to the detected pattern sequence. For each pattern sequence +* the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function +* can produce an arbitrary number of resulting elements. +* +* @param patternFlatSelectFunction The pattern flat select function which is called for each +* detected pattern sequence. +* @param Typ of the resulting elements +* @param outTypeInfo Explicit specification of output type. +* @return {@link DataStream} which contains the resulting elements from the pattern flat select +* function. +*/ + public DataStream flatSelect(final PatternFlatSelectFunctionpatternFlatSelectFunction, TypeInformation outTypeInfo) { return patternStream.flatMap( - new PatternFlatSelectMapper ( - patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction) + new PatternFlatSelectMapper<>( + patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction) --- End diff -- Indentation is off here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60925320 --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import org.apache.flink.api.common.functions.util.ListCollector +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap} +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.transformations.OneInputTransformation +import org.apache.flink.util.{Collector, TestLogger} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import org.junit.Assert._ +import org.junit.Test + +class PatternStreamScalaJavaAPIInteroperabiliyTest extends TestLogger { + + + @Test + @throws[Exception] + def testScalaJavaAPISelectFunForwarding { +val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment +val dummyDataStream: DataStream[(Int, Int)] = env.fromElements() +val pattern: Pattern[(Int, Int), _] = Pattern.begin[(Int, Int)]("dummy") +val pStream: PatternStream[(Int, Int)] = CEP.pattern(dummyDataStream, pattern) +val param = mutable.Map("begin" ->(1, 2)).asJava +val result: DataStream[(Int, Int)] = pStream + .select((pattern: mutable.Map[String, (Int, Int)]) => { +//verifies input parameter forwarding +assertEquals(param, pattern.asJava) +param.get("begin") + }) +val out = extractUserFunction[StreamMap[java.util.Map[String, (Int, Int)], (Int, Int)]](result) + .getUserFunction.map(param) +//verifies output parameter forwarding +assertEquals(param.get("begin"), out) + } + + @Test + @throws[Exception] + def testScalaJavaAPIFlatSelectFunForwarding { +val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment +val dummyDataStream: DataStream[List[Int]] = env.fromElements() +val pattern: Pattern[List[Int], _] = Pattern.begin[List[Int]]("dummy") +val pStream: PatternStream[List[Int]] = CEP.pattern(dummyDataStream, pattern) +val inList = List(1, 2, 3) +val inParam = mutable.Map("begin" -> inList).asJava +val outList = new java.util.ArrayList[List[Int]] +val outParam = new ListCollector[List[Int]](outList) + +val result: DataStream[List[Int]] = pStream + + .flatSelect((pattern: mutable.Map[String, List[Int]], out: Collector[List[Int]]) => { +//verifies input parameter forwarding +assertEquals(inParam, pattern.asJava) +out.collect(pattern.get("begin").get) + }) + +extractUserFunction[StreamFlatMap[java.util.Map[String, List[Int]], List[Int]]](result). + getUserFunction.flatMap(inParam, outParam) +//verify output parameter forwarding and that flatMap function was actually called +assertEquals(inList, outList.get(0)) + } + + def extractUserFunction[T](dataStream: DataStream[_]) = { +dataStream.javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[_, _]] + .getOperator + .asInstanceOf[T] + } --- End diff -- Good tests :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60925045 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import java.util.{Map => JMap} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction, +PatternStream => JPatternStream} +import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.streaming.api.scala.asScalaStream +import org.apache.flink.util.Collector +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected + * pattern sequences as a map of events associated with their names. The pattern is detected using + * a [[org.apache.flink.cep.nfa.NFA]]. In order to process the detected sequences, the user has to + * specify a [[PatternSelectFunction]] or a [[PatternFlatSelectFunction]]. + * + * @param jPatternStream Underlying pattern stream from Java API + * @tparam T Type of the events + */ +class PatternStream[T: TypeInformation](jPatternStream: JPatternStream[T]) { --- End diff -- `TypeInformation` is not needed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3808) Refactor the whole file monitoring source to take a fileInputFormat as an argument.
[ https://issues.apache.org/jira/browse/FLINK-3808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15256418#comment-15256418 ] Kostas Kloudas commented on FLINK-3808: --- PR https://github.com/apache/flink/pull/1929 addresses this. > Refactor the whole file monitoring source to take a fileInputFormat as an > argument. > --- > > Key: FLINK-3808 > URL: https://issues.apache.org/jira/browse/FLINK-3808 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > > This issue is just an intermediate step towards making the file source > fault-tolerant. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60924860 --- Diff: flink-libraries/flink-cep-scala/pom.xml --- @@ -0,0 +1,209 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +org.apache.flink +flink-libraries +1.1-SNAPSHOT +.. + +flink-cep-scala_2.10 +flink-cep-scala +jar + + + +org.apache.flink +flink-scala_2.10 +${project.version} + provided + + +org.apache.flink +flink-clients_2.10 +${project.version} + provided + + +org.apache.flink +flink-cep_2.10 +${project.version} + + +org.apache.flink +flink-streaming-java_2.10 +${project.version} +provided + + +org.apache.flink +flink-streaming-scala_2.10 +${project.version} +provided + + +org.apache.flink +flink-tests_2.10 +${project.version} +test +test-jar + + +org.apache.flink +flink-test-utils_2.10 +${project.version} +test + + +org.apache.flink +flink-streaming-java_2.10 +${project.version} +test +test-jar + --- End diff -- I think `flink-streaming-java_2.10` is not needed as a dependency. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Watch type proper
Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1929#issuecomment-214366444 Addresses FLINK-3808 . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60924404 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort} + +class DataSetSortRule --- End diff -- Our implementation of `DataSetSort` does not support offset and limit (which is fine and can be added later, IMO). Consequently, we can only translate `LogicalSort` into `DataSetSort` if `offset` and `limit` are both null. We need to override the method `matches` and only return `true` if `offset` and `limit` are null. See `DataSetJoinRule` for instance which checks if the join type is `JoinType.INNER`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Watch type proper
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/1929 Watch type proper This PR is for FLINK-3808. It refactor the FileMonitoring sources to take as an argument a FileInputFormat and work at the granularity of a split, and not at that of a file. It still does not change the API calls, which still call the old code. This will come in a following PR. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink watch_type_proper Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1929.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1929 commit f3b96b72eae877002fcd5f4699575d0e7cd928d4 Author: kl0uDate: 2016-04-18T14:37:54Z First commit in implementing the Monitor. commit c4f6f8c94b8501853a9fb06097b18fc378ea62ec Author: kl0u Date: 2016-04-19T11:07:08Z ToRemove commit 2405a2131a84d42fac25d34fb5c32d30a5710d64 Author: kl0u Date: 2016-04-19T12:00:27Z Removed unused classes. commit 80ce550134d968fa833ddb3219d64d2bbe902318 Author: kl0u Date: 2016-04-19T13:03:01Z Reverted classes back to master version. commit f7a89c90aee28d0d8fa074a3319318b32617e494 Author: kl0u Date: 2016-04-19T16:24:18Z A version of the monitor. commit b9ed3159beed5a08ec96876c8d43781a738da3cb Author: kl0u Date: 2016-04-20T13:27:09Z A non-faultolerant version that compiles. commit 2ab79bf158d6974eea0f4220c51fc8d8e6c767dd Author: kl0u Date: 2016-04-20T14:46:27Z Adding checkpointing functionality. commit d920fadaefa64f16342359430cbc831c94a8321c Author: kl0u Date: 2016-04-20T15:49:48Z Adding Tests. commit f82948161317000bf5f4018c492e12df2f6237d9 Author: kl0u Date: 2016-04-22T11:56:13Z Fixed Reader bug. commit 58df5ae96d48470c944ef2852b606bffe6bcbd70 Author: kl0u Date: 2016-04-24T11:16:07Z Restructuring the tests. commit a10109354ea8858c94c04758d5f849fa738532c7 Author: kl0u Date: 2016-04-24T21:19:11Z Adding the map with the timestamp. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60924299 --- Diff: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import org.apache.flink.api.common.functions.util.ListCollector +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap} +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.transformations.OneInputTransformation +import org.apache.flink.util.{Collector, TestLogger} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import org.junit.Assert._ +import org.junit.Test + +class PatternStreamScalaJavaAPIInteroperabiliyTest extends TestLogger { + + --- End diff -- two line breaks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60924213 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern} + +import scala.reflect.ClassTag + +package object pattern { + /** +* Utility method to wrap [[org.apache.flink.cep.pattern.Pattern]] and its subclasses +* for usage with the Scala API. +* +* @param javaPattern The underlying pattern from the Java API +* @tparam T Base type of the elements appearing in the pattern +* @tparam F Subtype of T to which the current pattern operator is constrained +* @return A pattern from the Scala API which wraps the pattern from the Java API +*/ + private[flink] def wrapPattern[T: ClassTag, F <: T](javaPattern: JPattern[T, F]) --- End diff -- `ClassTag` should not be necessary if `FollowedByPattern` and `Pattern` are adapted accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3808) Refactor the whole file monitoring source to take a fileInputFormat as an argument.
Kostas Kloudas created FLINK-3808: - Summary: Refactor the whole file monitoring source to take a fileInputFormat as an argument. Key: FLINK-3808 URL: https://issues.apache.org/jira/browse/FLINK-3808 Project: Flink Issue Type: Sub-task Reporter: Kostas Kloudas Assignee: Kostas Kloudas This issue is just an intermediate step towards making the file source fault-tolerant. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60924026 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern} + +import scala.reflect.ClassTag + +object FollowedByPattern { + /** +* Constructs a new Pattern by wrapping a given Java API Pattern +* +* @param jfbPattern Underlying Java API Pattern. +* @tparam T Base type of the elements appearing in the pattern +* @tparam F Subtype of T to which the current pattern operator is constrained +* @return New wrapping FollowedByPattern object +*/ + def apply[T: ClassTag, F <: T](jfbPattern: JFollowedByPattern[T, F]) = --- End diff -- I think the `ClassTag` should not be necessary here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60923941 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by [[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create + * a [[org.apache.flink.cep.nfa.NFA]]. + * + * {{{ + * Patternpattern = Pattern.begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * }}} + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T: ClassTag, F <: T](jPattern: JPattern[T, F]) { --- End diff -- For what do we need the `ClassTag` here? I think it shouldn't be necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60923428 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -401,6 +400,52 @@ class Table( new Table(relBuilder.build(), tableEnv) } + /** +* Sorts the given [[Table]]. Similar to SQL ORDER BY. The sorting is performed locally across +* all partitions with keys equal to the given fields. +* +* Example: +* +* {{{ +* tab.orderBy('name) +* }}} +*/ + def orderBy(fields: Expression*): Table = { +relBuilder.push(relNode) + +if (! fields.forall { + case x: UnresolvedFieldReference => true + case x@(_: Asc | _: Desc) => x.asInstanceOf[UnaryExpression] --- End diff -- With a common base class of `Asc` and `Desc` this can be done nicer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60923473 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSort} + +class DataSetSortRule + extends ConverterRule( +classOf[LogicalSort], +Convention.NONE, +DataSetConvention.INSTANCE, +"FlinkSortRule") { + override def convert(rel: RelNode): RelNode = { --- End diff -- Please a a new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1926#discussion_r60923266 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder + +case class Asc(child: Expression) extends UnaryExpression { --- End diff -- You add a common base class for orderings which `Asc` and `Desc` extend? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1895#issuecomment-214358848 I had some inline comments but overall the changes look good! I think can simplify the `BinaryInputFormat` by getting rid of the `filePos` and `justReadAllRecords` fields and just snapshotting the `blockPos`. The `fieldPos` and `justReadAllRecords` information functionally depend on the `blockPos`, so storing the `filePos` and `justReadAllRecords` fields just adds more complexity since we're keeping track of all of them. The snapshot would then just be `(blockPos, readRecords)`, upon restore the correct file read position can be derived from the block/split start position. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1895#discussion_r60920236 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.io; + +import org.apache.flink.core.io.InputSplit; + +import java.io.IOException; +import java.io.Serializable; + --- End diff -- Ah, I mean: ``` /** * Extension of {@link InputFormat} that allows checkpointing/restoring the current channel state. * * @param The type of the produced records. * @param The type of input split. * @param The type of the channel state snapshot. */ public interface CheckpointableInputFormatextends InputFormat { ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1895#discussion_r60919358 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.io; + +import org.apache.flink.core.io.InputSplit; + +import java.io.IOException; +import java.io.Serializable; + --- End diff -- We could add a comment here, something like "Extension of {@link InputFormat} that allows checkpointing/restoring the current channel state". Also, I think we should make the signature match `InputFormat`, i.e.: `public interface CheckpointableInputFormatextends InputFormat ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1895#discussion_r60916945 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java --- @@ -398,16 +401,20 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc return null; } - /** -* Opens the given input split. This method opens the input stream to the specified file, allocates read buffers -* and positions the stream at the correct position, making sure that any partial record at the beginning is skipped. -* -* @param split The input split to open. -* -* @see org.apache.flink.api.common.io.FileInputFormat#open(org.apache.flink.core.fs.FileInputSplit) -*/ @Override public void open(FileInputSplit split) throws IOException { + this.open(split, null); + } + /** --- End diff -- Indentation is off --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Add util for TDD instantiation
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1890#issuecomment-214338353 I like this Chesnay. Definitely makes the tests more readable. Feel free to merge it if more people think this is useful. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1895#discussion_r60915635 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java --- @@ -337,4 +402,18 @@ public int read(byte[] b, int off, int len) throws IOException { return totalRead; } } + + // --- End diff -- In most parts of the code we have dashes instead of equals signs, I think, like this: ``` // // Checkpointing // ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1895#discussion_r60914650 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java --- @@ -40,36 +41,46 @@ import java.util.List; /** - * Base class for all input formats that use blocks of fixed size. The input splits are aligned to these blocks. Without - * configuration, these block sizes equal the native block sizes of the HDFS. + * Base class for all input formats that use blocks of fixed size. The input splits are aligned to these blocks, + * meaning that each split will consist of one block. Without configuration, these block sizes equal the native + * block sizes of the HDFS. + * + * A block will contain a {@link BlockInfo} at the end of the block. There, the reader can find some statistics + * about the split currently being read, that will help correctly parse the contents of the block. */ @Public -public abstract class BinaryInputFormat extends FileInputFormat { +public abstract class BinaryInputFormat extends FileInputFormat + implements CheckpointableInputFormat> { + private static final long serialVersionUID = 1L; - /** -* The log. -*/ + /** The log. */ private static final Logger LOG = LoggerFactory.getLogger(BinaryInputFormat.class); - /** -* The config parameter which defines the fixed length of a record. -*/ + /** The config parameter which defines the fixed length of a record. */ public static final String BLOCK_SIZE_PARAMETER_KEY = "input.block_size"; public static final long NATIVE_BLOCK_SIZE = Long.MIN_VALUE; - /** -* The block size to use. -*/ + /** The block size to use. */ private long blockSize = NATIVE_BLOCK_SIZE; private transient DataInputViewStreamWrapper dataInputStream; + /** The BlockInfo for the Block corresponding to the split currently being read. */ private transient BlockInfo blockInfo; + /** A wrapper around the block currently being read. */ + private transient BlockBasedInput blockBasedInput = null; + + /** +* The number of records already read from the block. +* This is used to decide if the end of the block has been +* reached. +*/ private long readRecords; +// private transient Tuple3 restoredState; --- End diff -- Probably left over from the previous version of the code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...
Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-214332719 Thanks for the contribution @yjshen. I will also have a look at it tomorrow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---