[jira] [Commented] (FLINK-3811) Refactor ExecutionEnvironment in TableEnvironment

2016-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15257140#comment-15257140
 ] 

ASF GitHub Bot commented on FLINK-3811:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/1930

[FLINK-3811] [tableAPI] Use only Java ExecutionEnvironments in 
TableEnvironments

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableJavaEnv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1930.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1930


commit 38d06e0922932dc835abaf460d293fa96e51e4fc
Author: Fabian Hueske 
Date:   2016-04-25T16:58:23Z

[FLINK-3811] [tableAPI] Use only Java ExecutionEnvironments in 
TableEnvironments.




> Refactor ExecutionEnvironment in TableEnvironment
> -
>
> Key: FLINK-3811
> URL: https://issues.apache.org/jira/browse/FLINK-3811
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
> Fix For: 1.1.0
>
>
> Currently, the Scala BatchTableEnvironment has a reference to a Scala 
> ExecutionEnvironment and the Java BatchTableEnvironment uses the Java 
> ExecutionEnvironment. The same applies to their streaming counterparts.
> This requires special implementations for Java / Scala for instance to create 
> new data sources.
> I propose to refactor the TableEnvironments such that only the Java execution 
> environments for batch and streaming are used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3811] [tableAPI] Use only Java Executio...

2016-04-25 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/1930

[FLINK-3811] [tableAPI] Use only Java ExecutionEnvironments in 
TableEnvironments

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableJavaEnv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1930.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1930


commit 38d06e0922932dc835abaf460d293fa96e51e4fc
Author: Fabian Hueske 
Date:   2016-04-25T16:58:23Z

[FLINK-3811] [tableAPI] Use only Java ExecutionEnvironments in 
TableEnvironments.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3811) Refactor ExecutionEnvironment in TableEnvironment

2016-04-25 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-3811:


 Summary: Refactor ExecutionEnvironment in TableEnvironment
 Key: FLINK-3811
 URL: https://issues.apache.org/jira/browse/FLINK-3811
 Project: Flink
  Issue Type: Improvement
  Components: Table API
Reporter: Fabian Hueske
Assignee: Fabian Hueske
Priority: Minor
 Fix For: 1.1.0


Currently, the Scala BatchTableEnvironment has a reference to a Scala 
ExecutionEnvironment and the Java BatchTableEnvironment uses the Java 
ExecutionEnvironment. The same applies to their streaming counterparts.
This requires special implementations for Java / Scala for instance to create 
new data sources.

I propose to refactor the TableEnvironments such that only the Java execution 
environments for batch and streaming are used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3222) Incorrect shift amount in OperatorCheckpointStats#hashCode()

2016-04-25 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3222:
--
Description: 
Here is related code:
{code}
result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
>>> 32));
{code}
subTaskStats.length is an int.


The shift amount is greater than 31 bits.

  was:
Here is related code:
{code}
result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
>>> 32));
{code}
subTaskStats.length is an int.

The shift amount is greater than 31 bits.


> Incorrect shift amount in OperatorCheckpointStats#hashCode()
> 
>
> Key: FLINK-3222
> URL: https://issues.apache.org/jira/browse/FLINK-3222
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> Here is related code:
> {code}
> result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
> >>> 32));
> {code}
> subTaskStats.length is an int.
> The shift amount is greater than 31 bits.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3733) registeredTypesWithKryoSerializers is not assigned in ExecutionConfig#deserializeUserCode()

2016-04-25 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3733:
--
Description: 
{code}
if (serializedRegisteredTypesWithKryoSerializers != null) {
  registeredTypesWithKryoSerializers = 
serializedRegisteredTypesWithKryoSerializers.deserializeValue(userCodeClassLoader);
} else {
  registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
}
{code}
When serializedRegisteredTypesWithKryoSerializers is null, 
registeredTypesWithKryoSerializers is not assigned.

  was:
{code}
if (serializedRegisteredTypesWithKryoSerializers != null) {
  registeredTypesWithKryoSerializers = 
serializedRegisteredTypesWithKryoSerializers.deserializeValue(userCodeClassLoader);
} else {
  registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
}
{code}

When serializedRegisteredTypesWithKryoSerializers is null, 
registeredTypesWithKryoSerializers is not assigned.


> registeredTypesWithKryoSerializers is not assigned in 
> ExecutionConfig#deserializeUserCode()
> ---
>
> Key: FLINK-3733
> URL: https://issues.apache.org/jira/browse/FLINK-3733
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> if (serializedRegisteredTypesWithKryoSerializers != null) {
>   registeredTypesWithKryoSerializers = 
> serializedRegisteredTypesWithKryoSerializers.deserializeValue(userCodeClassLoader);
> } else {
>   registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
> }
> {code}
> When serializedRegisteredTypesWithKryoSerializers is null, 
> registeredTypesWithKryoSerializers is not assigned.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3734) Unclosed DataInputView in AbstractAlignedProcessingTimeWindowOperator#restoreState()

2016-04-25 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3734:
--
Description: 
{code}
DataInputView in = inputState.getState(getUserCodeClassloader());

final long nextEvaluationTime = in.readLong();
final long nextSlideTime = in.readLong();

AbstractKeyedTimePanes panes = 
createPanes(keySelector, function);
panes.readFromInput(in, keySerializer, stateTypeSerializer);

restoredState = new RestoredState<>(panes, nextEvaluationTime, 
nextSlideTime);
  }
{code}
DataInputView in is not closed upon return.

  was:
{code}
DataInputView in = inputState.getState(getUserCodeClassloader());

final long nextEvaluationTime = in.readLong();
final long nextSlideTime = in.readLong();

AbstractKeyedTimePanes panes = 
createPanes(keySelector, function);
panes.readFromInput(in, keySerializer, stateTypeSerializer);

restoredState = new RestoredState<>(panes, nextEvaluationTime, 
nextSlideTime);
  }
{code}

DataInputView in is not closed upon return.


> Unclosed DataInputView in 
> AbstractAlignedProcessingTimeWindowOperator#restoreState()
> 
>
> Key: FLINK-3734
> URL: https://issues.apache.org/jira/browse/FLINK-3734
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> DataInputView in = inputState.getState(getUserCodeClassloader());
> final long nextEvaluationTime = in.readLong();
> final long nextSlideTime = in.readLong();
> AbstractKeyedTimePanes panes = 
> createPanes(keySelector, function);
> panes.readFromInput(in, keySerializer, stateTypeSerializer);
> restoredState = new RestoredState<>(panes, nextEvaluationTime, 
> nextSlideTime);
>   }
> {code}
> DataInputView in is not closed upon return.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Flink 3691 extend avroinputformat to support g...

2016-04-25 Thread greghogan
Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1920#issuecomment-214463228
  
One more thought, since I've had this problem before. Since your pull 
request started with multiple commits the title chosen by GitHub is based on 
your branch name rather than the commit message. Since the formatting is 
different the PR does not automatically attached messages to the Jira ticket 
and the watchers probably haven't noticed your PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3691 extend avroinputformat to support g...

2016-04-25 Thread greghogan
Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/1920#issuecomment-214461886
  
Thanks for your contribution @gna-phetsarath.

I see there was a question in FLINK-3691 as to whether Flink already 
serializes GenericRecords. It looks to me that these are currently processed by 
`ReflectDatumReader` whereas with this PR would use `GenericDatumReader`.

Could you give a short example of data that can't be processed currently 
but now works with this PR? Is this also more efficient to use 
`GenericDatumReader`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...

2016-04-25 Thread kl0u
Github user kl0u commented on the pull request:

https://github.com/apache/flink/pull/1895#issuecomment-214448465
  
Hi @aljoscha , thanks a lot for the comments.
I addressed them. Please let me know what do you think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3691 extend avroinputformat to support g...

2016-04-25 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1920#discussion_r60952095
  
--- Diff: 
flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
 ---
@@ -289,6 +290,119 @@ public void testDeserializeToSpecificType() throws 
IOException {
}
}
 
+   /**
+* Test if the AvroInputFormat is able to properly read data from an 
Avro
+* file as a GenericRecord.
+* 
+* @throws IOException,
+* if there is an exception
+*/
+   @SuppressWarnings("unchecked")
+   @Test
+   public void testDeserialisationGenericRecord() throws IOException {
+   Configuration parameters = new Configuration();
+
+   AvroInputFormat format = new 
AvroInputFormat(new Path(testFile.getAbsolutePath()),
+   GenericRecord.class);
+   try {
+   format.configure(parameters);
--- End diff --

Should we explicitly set reuse to `true`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3691 extend avroinputformat to support g...

2016-04-25 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1920#discussion_r60950868
  
--- Diff: 
flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
 ---
@@ -289,6 +290,119 @@ public void testDeserializeToSpecificType() throws 
IOException {
}
}
 
+   /**
+* Test if the AvroInputFormat is able to properly read data from an 
Avro
+* file as a GenericRecord.
+* 
+* @throws IOException,
+* if there is an exception
+*/
+   @SuppressWarnings("unchecked")
+   @Test
+   public void testDeserialisationGenericRecord() throws IOException {
+   Configuration parameters = new Configuration();
+
+   AvroInputFormat format = new 
AvroInputFormat(new Path(testFile.getAbsolutePath()),
+   GenericRecord.class);
+   try {
+   format.configure(parameters);
+   FileInputSplit[] splits = format.createInputSplits(1);
+   assertEquals(splits.length, 1);
+   format.open(splits[0]);
+
+   GenericRecord u = format.nextRecord(null);
--- End diff --

I think this should be `format.nextRecord()` since `nextRecord(reuse)` 
requires an object to be passed in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3691 extend avroinputformat to support g...

2016-04-25 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1920#discussion_r60949545
  
--- Diff: 
flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
 ---
@@ -119,12 +144,18 @@ public E nextRecord(E reuseValue) throws IOException {
if (reachedEnd()) {
return null;
}
-   
-   if (!reuseAvroValue) {
-   reuseValue = 
InstantiationUtil.instantiate(avroValueType, Object.class);
+   if (isGenericRecord) {
--- End diff --

Can we replace the nested-if with `return reuseAvroValue ? 
dataFileReader.next(reuseValue) : dataFileReader.next();`, remove the 
`isGenericRecord` field, and flatten the `GenericDatumReader` / 
`SpecificDatumReader` / `ReflectDatumReader` if-statement above?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 3691 extend avroinputformat to support g...

2016-04-25 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1920#discussion_r60948974
  
--- Diff: 
flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
 ---
@@ -51,11 +62,13 @@
private transient FileReader dataFileReader;
 
private transient long end;
-
+   
+   private boolean isGenericRecord = false;

public AvroInputFormat(Path filePath, Class type) {
super(filePath);
this.avroValueType = type;
+   this.isGenericRecord = 
org.apache.avro.generic.GenericRecord.class == avroValueType;
--- End diff --

Should this be testing class equality or Class.isAssignableFrom?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60945888
  
--- Diff: flink-libraries/flink-cep-scala/pom.xml ---
@@ -0,0 +1,122 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+org.apache.flink
+flink-libraries
+1.1-SNAPSHOT
+..
+
+flink-cep-scala_2.10
+flink-cep-scala
+jar
+
+
+
+org.apache.flink
+flink-cep_2.10
+${project.version}
+
+
+org.apache.flink
+flink-streaming-scala_2.10
+${project.version}
+provided
+
+
+
+org.ow2.asm
+asm
+${asm.version}
+
+
+org.apache.flink
+flink-tests_2.10
+${project.version}
+test
+test-jar
+
+
+org.apache.flink
+flink-test-utils_2.10
+${project.version}
+test
+
+
+org.apache.flink
+flink-streaming-java_2.10
+${project.version}
+test
+test-jar
+
+
+org.apache.flink
+flink-streaming-scala_2.10
+${project.version}
+test
+test-jar
+
+
+org.apache.flink
+flink-cep_2.10
+${project.version}
+test
+test-jar
+
+
+
+
+
+
+
+org.scala-tools
+maven-scala-plugin
+2.15.2
+
+
+
+compile
+testCompile
+
+
+
+
+src/main/scala
+src/test/scala
+
+-Xms64m
+-Xmx1024m
+
+
+
--- End diff --

Can we maybe replace this plugin with
```

net.alchim31.maven
scala-maven-plugin
3.2.2



compile
testCompile




```
I gave you the definition for the old Scala plugin. Sorry my bad.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3810) Missing break in ZooKeeperLeaderElectionService#handleStateChange()

2016-04-25 Thread Ted Yu (JIRA)
Ted Yu created FLINK-3810:
-

 Summary: Missing break in 
ZooKeeperLeaderElectionService#handleStateChange()
 Key: FLINK-3810
 URL: https://issues.apache.org/jira/browse/FLINK-3810
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


{code}
  protected void handleStateChange(ConnectionState newState) {
switch (newState) {
  case CONNECTED:
LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");
  case SUSPENDED:
LOG.warn("Connection to ZooKeeper suspended. The contender " + 
leaderContender.getAddress()
  + "no longer participates in the leader election.");
  case RECONNECTED:
LOG.info("Connection to ZooKeeper was reconnected. Leader election can 
be restarted.");
  case LOST:
// Maybe we have to throw an exception here to terminate the JobManager
LOG.warn("Connection to ZooKeeper lost. The contender " + 
leaderContender.getAddress()
  + "no longer participates in the leader election.");
{code}
Any of the first 3 states would result in multiple log output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3809) Missing break in ZooKeeperLeaderRetrievalService#handleStateChange()

2016-04-25 Thread Ted Yu (JIRA)
Ted Yu created FLINK-3809:
-

 Summary: Missing break in 
ZooKeeperLeaderRetrievalService#handleStateChange()
 Key: FLINK-3809
 URL: https://issues.apache.org/jira/browse/FLINK-3809
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


{code}
  protected void handleStateChange(ConnectionState newState) {
switch (newState) {
  case CONNECTED:
LOG.debug("Connected to ZooKeeper quorum. Leader retrieval can start.");
  case SUSPENDED:
LOG.warn("Connection to ZooKeeper suspended. Can no longer retrieve the 
leader from " +
  "ZooKeeper.");
  case RECONNECTED:
LOG.info("Connection to ZooKeeper was reconnected. Leader retrieval can 
be restarted.");
  case LOST:
LOG.warn("Connection to ZooKeeper lost. Can no longer retrieve the 
leader from " +
  "ZooKeeper.");
}
{code}
Except for LOST state, the other states would lead to multiple logs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60943147
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
--- End diff --

Actually in the addSource() there is this line:
boolean isParallel = function instanceof ParallelSourceFunction;

whose result is passed in the constructor of the StreamSource.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60942903
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
--- End diff --

I would prefer local files since
* we would no longer require the hadoop dependency
* it probably reduces the test time
* you interact very little with the FIleSystem anyway


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60942837
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
--- End diff --

yup you're right, didn't know about the RIchParallelSourceFunction class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests

2016-04-25 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1915#discussion_r60942417
  
--- Diff: flink-test-utils/pom.xml ---
@@ -7,9 +7,7 @@ regarding copyright ownership.  The ASF licenses this file
 to you under the Apache License, Version 2.0 (the
 "License"); you may not use this file except in compliance
 with the License.  You may obtain a copy of the License at
-
--- End diff --

The license header looks to have been automatically reformatted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60942382
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
--- End diff --

addSource(...) makes no such guarantee, all calls like readFile(..) etc. 
all go through addSource(...).

You could be right about the NonParallelInput though, will check quickly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1915#issuecomment-214425488
  
Thanks for your contribution @stefanobortoli :-) I think the changes look 
good and can be merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1915#discussion_r60940770
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
 ---
@@ -53,41 +59,55 @@ public void testStartupWhenTaskmanagerActorPortIsUsed() 
{
blocker = new ServerSocket(0, 50, localAddress);
final int port = blocker.getLocalPort();
 
-   try {
-   TaskManager.runTaskManager(
-   localHostName,
-   ResourceID.generate(),
-   port,
-   new Configuration(),
+   TaskManager.runTaskManager(localHostName, 
ResourceID.generate(), port, new Configuration(),
TaskManager.class);
-   fail("This should fail with an IOException");
-   }
-   catch (IOException e) {
-   // expected. validate the error message
-   assertNotNull(e.getMessage());
-   assertTrue(e.getMessage().contains("Address 
already in use"));
+   fail("This should fail with an IOException");
+
+   } catch (IOException e) {
+   // expected. validate the error messagex
+   List causes = getExceptionCauses(e, new 
ArrayList());
+   for (Throwable cause : causes) {
+   if (cause instanceof BindException) {
+   throw (BindException) cause;
+   }
}
-
-   }
-   catch (Exception e) {
+   fail("This should fail with an exception caused by 
BindException");
+   } catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
-   }
-   finally {
+   } finally {
if (blocker != null) {
try {
blocker.close();
-   }
-   catch (IOException e) {
+   } catch (IOException e) {
// no need to log here
}
}
}
}
 
/**
-* Tests that the TaskManager startup fails synchronously when the I/O 
directories are
-* not writable.
+* A utility method to analyze the exceptions and collect the clauses
+* 
+* @param e
+*the root exception (Throwable) object
+* @param causes
+*the list of exceptions that caused the root exceptions
+* @return
+*/
+   private List getExceptionCauses(Throwable e, List 
causes) {
--- End diff --

What about factoring this method out into a Utils class so that it can be 
used by multiple tests. This would avoid code duplication.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1929#issuecomment-214420551
  
We may want to think about adding a createInputSplits(int minNumSplits, 
List files) to the FileInputFormat class; as it stands it scans through 
the entire directory although we could already exclude several files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60939797
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
--- End diff --

This is used for input formats, AFAIK. The RichSourceFunction is guaranteed 
to be of parallelism 1.
As I understand from the addSource(SourceFunction function, String 
sourceName, TypeInformation typeInfo) in the StreamExecutionEnvironment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60939583
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, FilePathFilter.DefaultFilter.getInstance(), 
watchType, readerParallelism, interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, FilePathFilter filter,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   this.readerParallelism = Math.max(readerParallelism, 1);
+   this.globalModificationTime = Long.MIN_VALUE;
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
+   format.configure(parameters);
+   }
+
+   /**
+* Creates the input splits for the path to be assigned to the 
downstream tasks.
+* Those are going to read their contents for further 

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60939625
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, FilePathFilter.DefaultFilter.getInstance(), 
watchType, readerParallelism, interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, FilePathFilter filter,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   this.readerParallelism = Math.max(readerParallelism, 1);
+   this.globalModificationTime = Long.MIN_VALUE;
+   }
+
+   @Override
+   @SuppressWarnings("unchecked")
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
+   format.configure(parameters);
+   }
+
+   /**
+* Creates the input splits for the path to be assigned to the 
downstream tasks.
+* Those are going to read their contents for further 

[GitHub] flink pull request: [FLINK-3778] [shell] Forward configuration fro...

2016-04-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1906


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-3778) ScalaShellRemoteStreamEnvironment cannot be forwarded a user configuration

2016-04-25 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-3778.

Resolution: Fixed

Fixed via 7498d3e35a29449270a88a30eb32b7de74887f5b

> ScalaShellRemoteStreamEnvironment cannot be forwarded a user configuration
> --
>
> Key: FLINK-3778
> URL: https://issues.apache.org/jira/browse/FLINK-3778
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala Shell
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> The {{ScalaShellRemoteStreamEnvironment}} cannot unlike the 
> {{ScalaShellRemoteEnvironment}} be configured with a user configuration. This 
> effectively prohibits an user to connect against an HA cluster. I think it 
> would be good to be able to specify a user configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60938941
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
--- End diff --

now this is really nit-picky, but i would prefer this being false by 
default, and being set to true in open().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60938712
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task which monitors a user-provided 
path and assigns splits
+ * to downstream tasks for further reading and processing. Which splits 
will be further processed
+ * depends on the user-provided {@link 
FileSplitMonitoringFunction.WatchType}.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
--- End diff --

This function should implement NonParallelInput; it guarantees that it will 
always run with a parallelism of 1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60937977
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+public interface FilePathFilter extends Serializable {
--- End diff --

Please add a JavaDoc to this class describing how/where it is used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60937348
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -35,12 +35,9 @@
 import java.util.Set;
 
 /**
- * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
- * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
- *
- * This method keeps track of which splits have already being processed by 
which task, and at which point
- * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
- * of the last modification time for each file, so that it can detect new 
data.
+ * This is the single (non-parallel) task, that monitors a user-provided 
path, and assigns splits
--- End diff --

both commas don't belong in this sentence.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60936805
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
--- End diff --

Yes, a local filesystem would do the job. 
Just wanted to have some tests with HDFS to be sure that it works, as this 
is closer to a distributed deployment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60936652
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SortITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testOrderByDesc(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val t = env.fromElements((1, "First"), (3, "Third"), (2, 
"Second")).toTable(tEnv)
+  .orderBy('_1.desc)
+
+val expected = "3,Third\n2,Second\n1,First"
+val results = t.toDataSet[Row].setParallelism(1).collect()
+compareOrderedResultAsText(expected, results)
+  }
+
+  @Test
+  def testOrderByAsc(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val t = env.fromElements((1, "First"), (3, "Third"), (2, 
"Second")).toTable(tEnv)
+  .orderBy('_1.asc)
+
+val expected = "1,First\n2,Second\n3,Third"
+val results = t.toDataSet[Row].setParallelism(1).collect()
+compareOrderedResultAsText(expected, results)
+  }
+
+  @Test
+  def testOrderByMultipleFieldsDifferentDirections(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+env.setParallelism(2)
+val t = env.fromElements((1, 3, "Third"), (1, 2, "Fourth"), (1, 4, 
"Second"),
+  (2, 1, "Sixth"), (1, 5, "First"), (1, 1, "Fifth"))
+  .toTable(tEnv).orderBy('_1.asc, '_2.desc)
+
+val expected = 
"1,5,First\n1,4,Second\n1,3,Third\n1,2,Fourth\n1,1,Fifth\n2,1,Sixth"
+val results = t.toDataSet[Row].mapPartition(rows => 
Seq(rows.toSeq)).collect()
+
+implicit def rowOrdering = Ordering.by((x : Row) => 
(x.productElement(0).asInstanceOf[Int],
--- End diff --

Not sure about this comment. I think it would(in fact I checked it)... 
Maybe you missed the next line(81)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3778] [shell] Forward configuration fro...

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1906#issuecomment-214406242
  
Failing test cases are unrelated. Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60935904
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
+ * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
+ * of the last modification time for each file, so that it can detect new 
data.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered. This can be currently 
done in 3 ways.
+* {@code ONLY_NEW_FILES} which implies that only new files will 
be processed.
+* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole 
file, as soon as new data is appended to it.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, FilePathFilter.DefaultFilter.getInstance(), 
watchType, readerParallelism, interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, FilePathFilter filter,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60933644
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+public class FileSplitMonitoringFunctionTest {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
+   private Set hdPathNames = new HashSet<>();
+   private Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   try {
+   for(org.apache.hadoop.fs.Path file: hdPaths) {
+   hdfs.delete(file, false);
+   }
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   } catch (IOException e) {
+   throw new RuntimeException(e);
+   }
+   }
+
+   //  END OF PREPARATIONS
+
+   //  TESTS
+
+   @Test
+   public void testFileContents() throws IOException {
--- End diff --

this test doesn't test anything in regards to the MonitoringFunction, does 
it?


---
If your project is set up for it, you 

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60934370
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+public class FileSplitMonitoringFunctionTest {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
+   private Set hdPathNames = new HashSet<>();
+   private Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   try {
+   for(org.apache.hadoop.fs.Path file: hdPaths) {
+   hdfs.delete(file, false);
+   }
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   } catch (IOException e) {
+   throw new RuntimeException(e);
+   }
+   }
+
+   //  END OF PREPARATIONS
+
+   //  TESTS
+
+   @Test
+   public void testFileContents() throws IOException {
+   // validates the output
+   for (org.apache.hadoop.fs.Path file : hdPaths) {
+   

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60931810
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+  * Base class for a pattern definition.
+  *
+  * A pattern definition is used by 
[[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create
+  * a [[org.apache.flink.cep.nfa.NFA]].
+  *
+  * {{{
+  * Pattern pattern = Pattern.begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * }}}
+  *
+  * @param jPattern Underlying Java API Pattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is 
constrained
+  */
+class Pattern[T: ClassTag, F <: T](jPattern: JPattern[T, F]) {
+
+  private[flink] def getWrappedPattern = jPattern
+
+  /**
+*
+* @return Name of the pattern operator
+*/
+  def getName: String = jPattern.getName
--- End diff --

In the Scala world one does not write Java bean like getters. Instead one 
would simply write `def name = jPattern.getName()`. Note that whenever we call 
java functions from Scala we add parenthesis even though they might not be 
needed. This is to underline that one is calling a Java function which does not 
have the notion of purity.

If we really wanna make the API identical, meaning that we offer functions 
like `getName`, `getWindowTIme` in the Scala API as well, then we should add 
parenthesis to their definitions to make clear that this is purposefully a Java 
bean like getter definition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60932653
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/DefaultPathFilter.java
 ---
@@ -0,0 +1,7 @@
+package org.apache.flink.streaming.api.functions.source;
+
+/**
+ * Created by kkloudas on 4/25/16.
--- End diff --

What do you think? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1905#issuecomment-214391606
  
Good work @StefanRRichter. I had only some minor comments left. Once these 
are addressed, the PR should be ready to be merged :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60932233
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
+ * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
+ * of the last modification time for each file, so that it can detect new 
data.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered. This can be currently 
done in 3 ways.
+* {@code ONLY_NEW_FILES} which implies that only new files will 
be processed.
+* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole 
file, as soon as new data is appended to it.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, FilePathFilter.DefaultFilter.getInstance(), 
watchType, readerParallelism, interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, FilePathFilter filter,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60932263
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
+ * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
+ * of the last modification time for each file, so that it can detect new 
data.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered. This can be currently 
done in 3 ways.
+* {@code ONLY_NEW_FILES} which implies that only new files will 
be processed.
+* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole 
file, as soon as new data is appended to it.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, FilePathFilter.DefaultFilter.getInstance(), 
watchType, readerParallelism, interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, FilePathFilter filter,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   

[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60932255
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+  * Base class for a pattern definition.
+  *
+  * A pattern definition is used by 
[[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create
+  * a [[org.apache.flink.cep.nfa.NFA]].
+  *
+  * {{{
+  * Pattern pattern = Pattern.begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * }}}
+  *
+  * @param jPattern Underlying Java API Pattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is 
constrained
+  */
+class Pattern[T: ClassTag, F <: T](jPattern: JPattern[T, F]) {
+
+  private[flink] def getWrappedPattern = jPattern
+
+  /**
+*
+* @return Name of the pattern operator
+*/
+  def getName: String = jPattern.getName
--- End diff --

I know this might be a bit confusing since you don't know the convention 
which is only "stated" implicitly. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1926#issuecomment-214388270
  
Thanks for the PR @dawidwys. I had a few comments. Please let me know if 
you have any questions.

Regarding a follow task, we have several open issue to extend / improve the 
Table API. For instance, you could add support for outer joins. That should not 
be too hard, because the DataSet API supports them natively. Alternatively, we 
will soon have an interface to define external tables and there will be issues 
to implement the interface for different storage systems (HBase, ...) or 
formats (Parquet, ...).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60931033
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
--- End diff --

method? This is the class javadoc!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1929#issuecomment-214387476
  
Can you provide a rough description as to how the 
FileSourceMonitoringFunction works and how it interacts with the actual formats?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60930454
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/DefaultPathFilter.java
 ---
@@ -0,0 +1,7 @@
+package org.apache.flink.streaming.api.functions.source;
+
+/**
+ * Created by kkloudas on 4/25/16.
+ */
+public class DefaultPathFilter {
--- End diff --

this class is never used as far as i can tell.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60930292
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SortITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testOrderByDesc(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val t = env.fromElements((1, "First"), (3, "Third"), (2, 
"Second")).toTable(tEnv)
+  .orderBy('_1.desc)
+
+val expected = "3,Third\n2,Second\n1,First"
+val results = t.toDataSet[Row].setParallelism(1).collect()
+compareOrderedResultAsText(expected, results)
+  }
+
+  @Test
+  def testOrderByAsc(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val t = env.fromElements((1, "First"), (3, "Third"), (2, 
"Second")).toTable(tEnv)
+  .orderBy('_1.asc)
+
+val expected = "1,First\n2,Second\n3,Third"
+val results = t.toDataSet[Row].setParallelism(1).collect()
+compareOrderedResultAsText(expected, results)
+  }
+
+  @Test
+  def testOrderByMultipleFieldsDifferentDirections(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+env.setParallelism(2)
+val t = env.fromElements((1, 3, "Third"), (1, 2, "Fourth"), (1, 4, 
"Second"),
+  (2, 1, "Sixth"), (1, 5, "First"), (1, 1, "Fifth"))
+  .toTable(tEnv).orderBy('_1.asc, '_2.desc)
+
+val expected = 
"1,5,First\n1,4,Second\n1,3,Third\n1,2,Fourth\n1,1,Fifth\n2,1,Sixth"
+val results = t.toDataSet[Row].mapPartition(rows => 
Seq(rows.toSeq)).collect()
+
+implicit def rowOrdering = Ordering.by((x : Row) => 
(x.productElement(0).asInstanceOf[Int],
+  - x.productElement(1).asInstanceOf[Int]))
+
+val result = results.sortBy(p => p.min).reduceLeft(_ ++ _)
--- End diff --

The general method to check for the globally sorted result looks good. Can 
you do the check for the other tests in a similar way? Setting the sort 
operator to `1` does not really test the functionality, IMO. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60929903
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SortITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testOrderByDesc(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val t = env.fromElements((1, "First"), (3, "Third"), (2, 
"Second")).toTable(tEnv)
+  .orderBy('_1.desc)
+
+val expected = "3,Third\n2,Second\n1,First"
+val results = t.toDataSet[Row].setParallelism(1).collect()
+compareOrderedResultAsText(expected, results)
+  }
+
+  @Test
+  def testOrderByAsc(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val t = env.fromElements((1, "First"), (3, "Third"), (2, 
"Second")).toTable(tEnv)
+  .orderBy('_1.asc)
+
+val expected = "1,First\n2,Second\n3,Third"
+val results = t.toDataSet[Row].setParallelism(1).collect()
+compareOrderedResultAsText(expected, results)
+  }
+
+  @Test
+  def testOrderByMultipleFieldsDifferentDirections(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+env.setParallelism(2)
+val t = env.fromElements((1, 3, "Third"), (1, 2, "Fourth"), (1, 4, 
"Second"),
+  (2, 1, "Sixth"), (1, 5, "First"), (1, 1, "Fifth"))
+  .toTable(tEnv).orderBy('_1.asc, '_2.desc)
+
+val expected = 
"1,5,First\n1,4,Second\n1,3,Third\n1,2,Fourth\n1,1,Fifth\n2,1,Sixth"
+val results = t.toDataSet[Row].mapPartition(rows => 
Seq(rows.toSeq)).collect()
+
+implicit def rowOrdering = Ordering.by((x : Row) => 
(x.productElement(0).asInstanceOf[Int],
--- End diff --

Can you change the `Ordering` such that is does not rely on the actual data 
(a value (2, 20, "Seventh") would not be sorted correctly).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60929807
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+public class FileSplitMonitoringFunctionTest {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
+   private Set hdPathNames = new HashSet<>();
+   private Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   try {
+   for(org.apache.hadoop.fs.Path file: hdPaths) {
+   hdfs.delete(file, false);
+   }
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   } catch (IOException e) {
+   throw new RuntimeException(e);
+   }
+   }
+
+   //  END OF PREPARATIONS
+
+   //  TESTS
+
+   @Test
+   public void testFileContents() throws IOException {
+   // validates the output
+   for (org.apache.hadoop.fs.Path file : hdPaths) {
+   

[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60929665
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionTest.java
 ---
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+public class FileSplitMonitoringFunctionTest {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
+
+   private Set hdPaths = new HashSet<>();
+   private Set hdPathNames = new HashSet<>();
+   private Map hdPathContents = new HashMap<>();
+
+   //  PREPARING FOR THE TESTS
+
+   @Before
+   public void createHDFS() {
+   try {
+   baseDir = new 
File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+   FileUtil.fullyDelete(baseDir);
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+   hdConf.set("dfs.block.size", String.valueOf(1048576)); 
// this is the minimum we can set.
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+
+   hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
+   hdfs = new 
org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+   } catch(Throwable e) {
+   e.printStackTrace();
+   Assert.fail("Test failed " + e.getMessage());
+   }
+   }
+
+   @After
+   public void destroyHDFS() {
+   try {
+   for(org.apache.hadoop.fs.Path file: hdPaths) {
+   hdfs.delete(file, false);
+   }
+   FileUtil.fullyDelete(baseDir);
+   hdfsCluster.shutdown();
+   } catch (IOException e) {
+   throw new RuntimeException(e);
+   }
+   }
+
+   //  END OF PREPARATIONS
+
+   //  TESTS
+
+   @Test
+   public void testFileContents() throws IOException {
+   // validates the output
+   for (org.apache.hadoop.fs.Path file : hdPaths) {
+   

[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60929272
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SortITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testOrderByDesc(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val t = env.fromElements((1, "First"), (3, "Third"), (2, 
"Second")).toTable(tEnv)
+  .orderBy('_1.desc)
+
+val expected = "3,Third\n2,Second\n1,First"
+val results = t.toDataSet[Row].setParallelism(1).collect()
+compareOrderedResultAsText(expected, results)
+  }
+
+  @Test
+  def testOrderByAsc(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val t = env.fromElements((1, "First"), (3, "Third"), (2, 
"Second")).toTable(tEnv)
+  .orderBy('_1.asc)
+
+val expected = "1,First\n2,Second\n3,Third"
+val results = t.toDataSet[Row].setParallelism(1).collect()
+compareOrderedResultAsText(expected, results)
+  }
+
+  @Test
+  def testOrderByMultipleFieldsDifferentDirections(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+env.setParallelism(2)
+val t = env.fromElements((1, 3, "Third"), (1, 2, "Fourth"), (1, 4, 
"Second"),
+  (2, 1, "Sixth"), (1, 5, "First"), (1, 1, "Fifth"))
+  .toTable(tEnv).orderBy('_1.asc, '_2.desc)
+
+val expected = 
"1,5,First\n1,4,Second\n1,3,Third\n1,2,Fourth\n1,1,Fifth\n2,1,Sixth"
+val results = t.toDataSet[Row].mapPartition(rows => 
Seq(rows.toSeq)).collect()
+
+implicit def rowOrdering = Ordering.by((x : Row) => 
(x.productElement(0).asInstanceOf[Int],
+  - x.productElement(1).asInstanceOf[Int]))
+
+val result = results.sortBy(p => p.min).reduceLeft(_ ++ _)
+
+compareOrderedResultAsText(expected, result)
+  }
+
+  @Test
+  def testOrderByMultipleFieldsWithSql(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery = "SELECT * FROM MyTable ORDER BY _1 DESC, _2 DESC"
+
+val t = env.fromElements((1, 1, "First"), (2, 3, "Fourth"), (1, 2, 
"Second"),
+  (2, 1, "Third")).toTable(tEnv)
+tEnv.registerDataSet("MyTable", t)
+
+val queryResult = tEnv.sql(sqlQuery)
+
+val expected = "2,3,Fourth\n2,1,Third\n1,2,Second\n1,1,First"
+val results = queryResult.toDataSet[Row].setParallelism(1).collect()
+compareOrderedResultAsText(expected, results)
+  }
+
+  private def compareOrderedResultAsText[T](expected: String, results: 
Seq[T]) = {
--- End diff --

If we always use `toDataSet[Row]` and respect 

[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60929017
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SortITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testOrderByDesc(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val t = env.fromElements((1, "First"), (3, "Third"), (2, 
"Second")).toTable(tEnv)
+  .orderBy('_1.desc)
+
+val expected = "3,Third\n2,Second\n1,First"
+val results = t.toDataSet[Row].setParallelism(1).collect()
--- End diff --

I think you change the parallelism of the sort operator to `1´ with this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928894
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunctionITCase.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class FileSplitMonitoringFunctionITCase extends 
StreamingProgramTestBase {
+
+   private static final int NO_OF_FILES = 10;
+   private static final int LINES_PER_FILE = 10;
+
+   private static final long INTERVAL = 200;
+
+   private File baseDir;
+
+   private org.apache.hadoop.fs.FileSystem hdfs;
+   private String hdfsURI;
+   private MiniDFSCluster hdfsCluster;
--- End diff --

is it really necessary to spin up a dfs cluster for this test? it should 
just as well with local files, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60928727
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SortITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testOrderByDesc(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
--- End diff --

Can you set the parallelism of the environment to 4 to ensure that the 
tests are executed in parallel?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3229] Flink streaming consumer for AWS ...

2016-04-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1911#discussion_r60928569
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis;
+
+import com.amazonaws.regions.Regions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import 
org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
+import 
org.apache.flink.streaming.connectors.kinesis.config.CredentialProviderType;
+import 
org.apache.flink.streaming.connectors.kinesis.config.InitialPosition;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
+import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Flink Kinesis Consumer is a parallel streaming data source that 
pulls data from multiple AWS Kinesis streams
+ * within the same AWS service region. Each instance of the consumer is 
responsible for fetching data records from
+ * one or more Kinesis shards.
+ *
+ * To leverage Flink's checkpointing mechanics for exactly-once 
streaming processing guarantees, the Flink Kinesis
+ * consumer is implemented with the AWS Java SDK, instead of the 
officially recommended AWS Kinesis Client Library, for
+ * low-level control on the management of stream state. The Flink Kinesis 
Connector also supports setting the initial
+ * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST:
+ *
+ * 
+ * 
+ *
+ * NOTE: The current implementation does not correctly handle 
resharding of AWS Kinesis streams.
+ * NOTE: Since Kinesis and Kafka share many common abstractions, 
the implementation is heavily based on
+ * the Flink Kafka Consumer.
+ *
+ * @param  the type of data emitted
+ */
+public class FlinkKinesisConsumer extends RichParallelSourceFunction
+   implements CheckpointListener, 
CheckpointedAsynchronously>, 
ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisConsumer.class);
+
+   // 

+   //  Consumer properties
+   // 

+
+   /** The complete list of shards */
+   private final List shards;
+
+   /** Properties to 

[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60928598
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SortITCase.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SortITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testOrderByDesc(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val t = env.fromElements((1, "First"), (3, "Third"), (2, 
"Second")).toTable(tEnv)
--- End diff --

I would use more than three records to test the sorting. Can you use the 
`CollectionDataSets.get3TupleDataSet()` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928614
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
+ * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
+ * of the last modification time for each file, so that it can detect new 
data.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered. This can be currently 
done in 3 ways.
+* {@code ONLY_NEW_FILES} which implies that only new files will 
be processed.
+* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole 
file, as soon as new data is appended to it.
+*/
+   public enum WatchType {
--- End diff --

where is ONLY_NEW_FILES ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928509
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
+ * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
+ * of the last modification time for each file, so that it can detect new 
data.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered. This can be currently 
done in 3 ways.
--- End diff --

I would remove the ```This can be currently done in 3 ways.``` part. It 
doesn't really add anything to the description, and is likely to become 
out-dated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928251
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+public interface FilePathFilter extends Serializable {
+
+   /**
+* @return {@code true} if the {@code filePath} given is to be
+* ignored when processing a directory, e.g.
+* 
+* {@code
+*
+* public boolean filterPaths(Path filePath) {
+* return filePath.getName().startsWith(".") || 
filePath.getName().contains("_COPYING_");
+* }
+* }
+* */
+   boolean filterPaths(Path filePath);
+
+
+   /**
+* The default file path filtering method and is used
+* if no other such function is provided. This filter leaves out
+* files starting with ".", "_", and "_COPYING_".
+* */
+   public class DefaultFilter implements FilePathFilter {
+
+   private static DefaultFilter instance = null;
+
+   DefaultFilter() {
+
--- End diff --

unnecessary empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928155
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+public interface FilePathFilter extends Serializable {
+
+   /**
+* @return {@code true} if the {@code filePath} given is to be
+* ignored when processing a directory, e.g.
+* 
+* {@code
+*
+* public boolean filterPaths(Path filePath) {
+* return filePath.getName().startsWith(".") || 
filePath.getName().contains("_COPYING_");
+* }
+* }
+* */
+   boolean filterPaths(Path filePath);
--- End diff --

IMO this method should be called ```filterPath``` since it only receives a 
single path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928227
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+public interface FilePathFilter extends Serializable {
+
+   /**
+* @return {@code true} if the {@code filePath} given is to be
+* ignored when processing a directory, e.g.
+* 
+* {@code
+*
+* public boolean filterPaths(Path filePath) {
+* return filePath.getName().startsWith(".") || 
filePath.getName().contains("_COPYING_");
+* }
+* }
+* */
+   boolean filterPaths(Path filePath);
+
+
+   /**
+* The default file path filtering method and is used
+* if no other such function is provided. This filter leaves out
+* files starting with ".", "_", and "_COPYING_".
+* */
+   public class DefaultFilter implements FilePathFilter {
+
+   private static DefaultFilter instance = null;
+
+   DefaultFilter() {
+
+   }
+
+   public static DefaultFilter getInstance() {
+   if(instance == null) {
--- End diff --

missing space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60928005
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/DefaultPathFilter.java
 ---
@@ -0,0 +1,7 @@
+package org.apache.flink.streaming.api.functions.source;
+
+/**
+ * Created by kkloudas on 4/25/16.
--- End diff --

did you meant to include this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60928086
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelCollation, RelNode, SingleRel}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.operators.{DataSource, Operator, 
PartitionOperator}
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConverters._
+
+class DataSetSort(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inp: RelNode,
+collations: RelCollation,
+rowType: RelDataType)
+  extends SingleRel(cluster, traitSet, inp)
+  with DataSetRel{
+
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode ={
+new DataSetSort(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  collations,
+  rowType
+)
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]] = None): 
DataSet[Any] = {
--- End diff --

We are ignoring the `expectedType`. The convention here is to return a 
`DataSet` of that type.
If the input type (`inputDS.getType()`) is not equal to the expected type, 
we need to add a Map function after the sort, which translates the records into 
the expected type. See `DataSetSource` for an example of how to do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60927886
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 ---
@@ -458,7 +458,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* every 100 milliseconds.
*
*/
-  def readFileStream(StreamPath: String, intervalMillis: Long = 100, 
--- End diff --

unrelated change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Refactoring the File Monitoring Source.

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60927783
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
+ * to downstream tasks for further reading and processing, depending on 
the user-provided {@link FileSplitMonitoringFunction.WatchType}.
+ *
+ * This method keeps track of which splits have already being processed by 
which task, and at which point
+ * in the file we are currently processing, at the granularity of the 
split. In addition, it keeps track
+ * of the last modification time for each file, so that it can detect new 
data.
+ */
+public class FileSplitMonitoringFunction
+   extends RichSourceFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitMonitoringFunction.class);
+
+   /**
+* Specifies when computation will be triggered. This can be currently 
done in 3 ways.
+* {@code ONLY_NEW_FILES} which implies that only new files will 
be processed.
+* {@code REPROCESS_WITH_APPENDED} which reprocesses the whole 
file, as soon as new data is appended to it.
+*/
+   public enum WatchType {
+   REPROCESS_WITH_APPENDED // Reprocesses the whole file 
when new data is appended.
+   }
+
+   /** The path to monitor. */
+   private final String path;
+
+   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   private final int readerParallelism;
+
+   /** The {@link FileInputFormat} to be read. */
+   private FileInputFormat format;
+
+   /** How often to monitor the state of the directory for new data. */
+   private final long interval;
+
+   /** Which new data to process (see {@link WatchType}. */
+   private final WatchType watchType;
+
+   private long globalModificationTime;
+
+   private FilePathFilter pathFilter;
+
+   private volatile boolean isRunning = true;
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this(format, path, FilePathFilter.DefaultFilter.getInstance(), 
watchType, readerParallelism, interval);
+   }
+
+   public FileSplitMonitoringFunction(
+   FileInputFormat format, String path, FilePathFilter filter,
+   WatchType watchType, int readerParallelism, long interval) {
+
+   this.format = Preconditions.checkNotNull(format);
+   this.path = Preconditions.checkNotNull(path);
+
+   Preconditions.checkArgument(interval >= 100,
+   "The specified monitoring interval is smaller than the 
minimum allowed one (100 ms).");
+   this.interval = interval;
+
+   this.watchType = watchType;
+
+   this.pathFilter = Preconditions.checkNotNull(filter);
+
+   

[jira] [Commented] (FLINK-3806) Revert use of DataSet.count() in Gelly

2016-04-25 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15256433#comment-15256433
 ] 

Aljoscha Krettek commented on FLINK-3806:
-

Correct, I think the {{count()}}/{{collect()}} methods are somewhat dangerous 
as long as we recompute them every time.

> Revert use of DataSet.count() in Gelly
> --
>
> Key: FLINK-3806
> URL: https://issues.apache.org/jira/browse/FLINK-3806
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Priority: Critical
>
> FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The 
> former returns a {{DataSet}} while the latter executes a job to return a Java 
> value.
> {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and 
> {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and 
> {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the 
> user does not pass the number of vertices as a parameter.
> As noted in FLINK-1632, this does make the code simpler but if my 
> understanding is correct will materialize the Graph twice. The Graph will 
> need to be reread from input, regenerated, or recomputed by preceding 
> algorithms.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60927218
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelCollation, RelNode, SingleRel}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.operators.{DataSource, Operator, 
PartitionOperator}
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConverters._
+
+class DataSetSort(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inp: RelNode,
+collations: RelCollation,
+rowType: RelDataType)
+  extends SingleRel(cluster, traitSet, inp)
+  with DataSetRel{
+
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode ={
+new DataSetSort(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  collations,
+  rowType
+)
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]] = None): 
DataSet[Any] = {
+
+val config = tableEnv.getConfig
+
+val inputDS = 
wrapDataSet(inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv))
+
+val returnType = determineReturnType(
+  getRowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage
+)
+
+val fieldCollations = collations.getFieldCollations.asScala
+  .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
+
+val currentParallelism = if (inputDS.getParallelism >= 1) {
+  inputDS.getParallelism
+} else {
+  inputDS.getExecutionEnvironment.getParallelism
+}
+
+var partitionedDs = if (currentParallelism == 1) {
+  inputDS.setParallelism(1)
+} else {
+  wrapDataSet(inputDS.partitionByRange(fieldCollations.map(_._1): 
_*).javaSet
+.asInstanceOf[PartitionOperator[Any]]
+.withOrders(fieldCollations.map(_._2): _*))
+}
+
+fieldCollations.foreach { fieldCollation =>
+  partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
+}
+
+partitionedDs.javaSet
+  }
+
+  private def directionToOrder(direction: Direction) = {
+direction match {
+  case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => 
Order.ASCENDING
+  case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => 
Order.DESCENDING
+}
+
+  }
+
+  private def wrapDataSet(dataSet: DataSet[Any]) = {
--- End diff --

We do not need to wrap Java `DataSet` into Scala DataSet`. The other 
`DataSetRel` are also only dealing with Java `DataSet`. This also means we do 
not need to unwrap it with `javaSet`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60926920
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelCollation, RelNode, SingleRel}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.operators.{DataSource, Operator, 
PartitionOperator}
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConverters._
+
+class DataSetSort(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inp: RelNode,
+collations: RelCollation,
+rowType: RelDataType)
+  extends SingleRel(cluster, traitSet, inp)
+  with DataSetRel{
+
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode ={
+new DataSetSort(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  collations,
+  rowType
+)
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]] = None): 
DataSet[Any] = {
+
+val config = tableEnv.getConfig
+
+val inputDS = 
wrapDataSet(inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv))
+
+val returnType = determineReturnType(
+  getRowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage
+)
+
+val fieldCollations = collations.getFieldCollations.asScala
+  .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
+
+val currentParallelism = if (inputDS.getParallelism >= 1) {
--- End diff --

I am not sure about the handling of the parallelism here. I would only 
check if the default parallelism of the environment is set to 1. If no, we add 
a range partition operator, if no, we don't.
Parallelism is not really handled by the Table API at the moment. This is 
something that we need to improve in the future but not as part of this PR, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Watch type proper

2016-04-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1929#discussion_r60926922
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSplitMonitoringFunction.java
 ---
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) task, that monitors a user-procided 
path, and assigns splits
--- End diff --

typo: procided -> provided


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60926405
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelCollation, RelNode, SingleRel}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.operators.{DataSource, Operator, 
PartitionOperator}
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConverters._
+
+class DataSetSort(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inp: RelNode,
+collations: RelCollation,
+rowType: RelDataType)
+  extends SingleRel(cluster, traitSet, inp)
+  with DataSetRel{
+
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode ={
+new DataSetSort(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  collations,
+  rowType
+)
+  }
+
+  override def translateToPlan(
+  tableEnv: BatchTableEnvironment,
+  expectedType: Option[TypeInformation[Any]] = None): 
DataSet[Any] = {
+
+val config = tableEnv.getConfig
+
+val inputDS = 
wrapDataSet(inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv))
+
+val returnType = determineReturnType(
+  getRowType,
+  expectedType,
+  config.getNullCheck,
+  config.getEfficientTypeUsage
+)
+
+val fieldCollations = collations.getFieldCollations.asScala
+  .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
+
+val currentParallelism = if (inputDS.getParallelism >= 1) {
+  inputDS.getParallelism
+} else {
+  inputDS.getExecutionEnvironment.getParallelism
+}
+
+var partitionedDs = if (currentParallelism == 1) {
+  inputDS.setParallelism(1)
--- End diff --

I don't think we should set the parallelism of the input operator to `1`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Watch type proper

2016-04-25 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1929#issuecomment-214373041
  
What's with the PR title? :confused: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60926030
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -401,6 +400,52 @@ class Table(
 new Table(relBuilder.build(), tableEnv)
   }
 
+  /**
+* Sorts the given [[Table]]. Similar to SQL ORDER BY. The sorting is 
performed locally across
+* all partitions with keys equal to the given fields.
+*
+* Example:
+*
+* {{{
+*   tab.orderBy('name)
--- End diff --

Add `desc` to the example to show how it is used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60925922
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -401,6 +400,52 @@ class Table(
 new Table(relBuilder.build(), tableEnv)
   }
 
+  /**
+* Sorts the given [[Table]]. Similar to SQL ORDER BY. The sorting is 
performed locally across
--- End diff --

I think the following sentence can be confusing
> The sorting is performed locally across all partitions with keys equal to 
the given fields.

How about
> The resulting Table is sorted globally sorted across all parallel 
partitions. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60925702
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java 
---
@@ -99,9 +116,25 @@
null,
false);
 
+   return flatSelect(patternFlatSelectFunction, outTypeInfo);
+   }
+
+   /**
+* Applies a flat select function to the detected pattern sequence. For 
each pattern sequence
+* the provided {@link PatternFlatSelectFunction} is called. The 
pattern flat select function
+* can produce an arbitrary number of resulting elements.
+*
+* @param patternFlatSelectFunction The pattern flat select function 
which is called for each
+*  detected pattern sequence.
+* @param  Typ of the resulting elements
+* @param outTypeInfo Explicit specification of output type.
+* @return {@link DataStream} which contains the resulting elements 
from the pattern flat select
+* function.
+*/
+   public  DataStream flatSelect(final PatternFlatSelectFunction patternFlatSelectFunction, TypeInformation outTypeInfo) {
return patternStream.flatMap(
-   new PatternFlatSelectMapper(
-   
patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
+   new PatternFlatSelectMapper<>(
+   
patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
--- End diff --

Indentation is off here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60925320
  
--- Diff: 
flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.api.common.functions.util.ListCollector
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap}
+import org.apache.flink.streaming.api.scala._
+import 
org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.util.{Collector, TestLogger}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import org.junit.Assert._
+import org.junit.Test
+
+class PatternStreamScalaJavaAPIInteroperabiliyTest extends TestLogger {
+
+
+  @Test
+  @throws[Exception]
+  def testScalaJavaAPISelectFunForwarding {
+val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
+val dummyDataStream: DataStream[(Int, Int)] = env.fromElements()
+val pattern: Pattern[(Int, Int), _] = Pattern.begin[(Int, 
Int)]("dummy")
+val pStream: PatternStream[(Int, Int)] = CEP.pattern(dummyDataStream, 
pattern)
+val param = mutable.Map("begin" ->(1, 2)).asJava
+val result: DataStream[(Int, Int)] = pStream
+  .select((pattern: mutable.Map[String, (Int, Int)]) => {
+//verifies input parameter forwarding
+assertEquals(param, pattern.asJava)
+param.get("begin")
+  })
+val out = extractUserFunction[StreamMap[java.util.Map[String, (Int, 
Int)], (Int, Int)]](result)
+  .getUserFunction.map(param)
+//verifies output parameter forwarding
+assertEquals(param.get("begin"), out)
+  }
+
+  @Test
+  @throws[Exception]
+  def testScalaJavaAPIFlatSelectFunForwarding {
+val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
+val dummyDataStream: DataStream[List[Int]] = env.fromElements()
+val pattern: Pattern[List[Int], _] = Pattern.begin[List[Int]]("dummy")
+val pStream: PatternStream[List[Int]] = CEP.pattern(dummyDataStream, 
pattern)
+val inList = List(1, 2, 3)
+val inParam = mutable.Map("begin" -> inList).asJava
+val outList = new java.util.ArrayList[List[Int]]
+val outParam = new ListCollector[List[Int]](outList)
+
+val result: DataStream[List[Int]] = pStream
+
+  .flatSelect((pattern: mutable.Map[String, List[Int]], out: 
Collector[List[Int]]) => {
+//verifies input parameter forwarding
+assertEquals(inParam, pattern.asJava)
+out.collect(pattern.get("begin").get)
+  })
+
+extractUserFunction[StreamFlatMap[java.util.Map[String, List[Int]], 
List[Int]]](result).
+  getUserFunction.flatMap(inParam, outParam)
+//verify output parameter forwarding and that flatMap function was 
actually called
+assertEquals(inList, outList.get(0))
+  }
+
+  def extractUserFunction[T](dataStream: DataStream[_]) = {
+dataStream.javaStream
+  .getTransformation
+  .asInstanceOf[OneInputTransformation[_, _]]
+  .getOperator
+  .asInstanceOf[T]
+  }
--- End diff --

Good tests :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60925045
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import java.util.{Map => JMap}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.{PatternFlatSelectFunction, 
PatternSelectFunction,
+PatternStream => JPatternStream}
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.asScalaStream
+import org.apache.flink.util.Collector
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * Stream abstraction for CEP pattern detection. A pattern stream is a 
stream which emits detected
+  * pattern sequences as a map of events associated with their names. The 
pattern is detected using
+  * a [[org.apache.flink.cep.nfa.NFA]]. In order to process the detected 
sequences, the user has to
+  * specify a [[PatternSelectFunction]] or a [[PatternFlatSelectFunction]].
+  *
+  * @param jPatternStream Underlying pattern stream from Java API
+  * @tparam T Type of the events
+  */
+class PatternStream[T: TypeInformation](jPatternStream: JPatternStream[T]) 
{
--- End diff --

`TypeInformation` is not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3808) Refactor the whole file monitoring source to take a fileInputFormat as an argument.

2016-04-25 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15256418#comment-15256418
 ] 

Kostas Kloudas commented on FLINK-3808:
---

PR https://github.com/apache/flink/pull/1929 addresses this.

> Refactor the whole file monitoring source to take a fileInputFormat as an 
> argument.
> ---
>
> Key: FLINK-3808
> URL: https://issues.apache.org/jira/browse/FLINK-3808
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> This issue is just an intermediate step towards making the file source 
> fault-tolerant.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60924860
  
--- Diff: flink-libraries/flink-cep-scala/pom.xml ---
@@ -0,0 +1,209 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+org.apache.flink
+flink-libraries
+1.1-SNAPSHOT
+..
+
+flink-cep-scala_2.10
+flink-cep-scala
+jar
+
+
+
+org.apache.flink
+flink-scala_2.10
+${project.version}
+   provided
+
+
+org.apache.flink
+flink-clients_2.10
+${project.version}
+   provided
+
+
+org.apache.flink
+flink-cep_2.10
+${project.version}
+
+
+org.apache.flink
+flink-streaming-java_2.10
+${project.version}
+provided
+
+
+org.apache.flink
+flink-streaming-scala_2.10
+${project.version}
+provided
+
+
+org.apache.flink
+flink-tests_2.10
+${project.version}
+test
+test-jar
+
+
+org.apache.flink
+flink-test-utils_2.10
+${project.version}
+test
+
+
+org.apache.flink
+flink-streaming-java_2.10
+${project.version}
+test
+test-jar
+
--- End diff --

I think `flink-streaming-java_2.10` is not needed as a dependency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Watch type proper

2016-04-25 Thread kl0u
Github user kl0u commented on the pull request:

https://github.com/apache/flink/pull/1929#issuecomment-214366444
  
Addresses FLINK-3808 .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60924404
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSort}
+
+class DataSetSortRule
--- End diff --

Our implementation of `DataSetSort` does not support offset and limit 
(which is fine and can be added later, IMO). Consequently, we can only 
translate `LogicalSort` into `DataSetSort` if `offset` and `limit` are both 
null.

We need to override the method `matches` and only return `true` if `offset` 
and `limit` are null. See `DataSetJoinRule` for instance which checks if the 
join type is `JoinType.INNER`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Watch type proper

2016-04-25 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/1929

Watch type proper

This PR is for FLINK-3808. It refactor the FileMonitoring sources to take 
as an argument a FileInputFormat and work at the granularity of a split, and 
not at that of a file. It still does not  change the API calls, which still 
call the old code. This will come in a following PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink watch_type_proper

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1929.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1929


commit f3b96b72eae877002fcd5f4699575d0e7cd928d4
Author: kl0u 
Date:   2016-04-18T14:37:54Z

First commit in implementing the Monitor.

commit c4f6f8c94b8501853a9fb06097b18fc378ea62ec
Author: kl0u 
Date:   2016-04-19T11:07:08Z

ToRemove

commit 2405a2131a84d42fac25d34fb5c32d30a5710d64
Author: kl0u 
Date:   2016-04-19T12:00:27Z

Removed unused classes.

commit 80ce550134d968fa833ddb3219d64d2bbe902318
Author: kl0u 
Date:   2016-04-19T13:03:01Z

Reverted classes back to master version.

commit f7a89c90aee28d0d8fa074a3319318b32617e494
Author: kl0u 
Date:   2016-04-19T16:24:18Z

A version of the monitor.

commit b9ed3159beed5a08ec96876c8d43781a738da3cb
Author: kl0u 
Date:   2016-04-20T13:27:09Z

A non-faultolerant version that compiles.

commit 2ab79bf158d6974eea0f4220c51fc8d8e6c767dd
Author: kl0u 
Date:   2016-04-20T14:46:27Z

Adding checkpointing functionality.

commit d920fadaefa64f16342359430cbc831c94a8321c
Author: kl0u 
Date:   2016-04-20T15:49:48Z

Adding Tests.

commit f82948161317000bf5f4018c492e12df2f6237d9
Author: kl0u 
Date:   2016-04-22T11:56:13Z

Fixed Reader bug.

commit 58df5ae96d48470c944ef2852b606bffe6bcbd70
Author: kl0u 
Date:   2016-04-24T11:16:07Z

Restructuring the tests.

commit a10109354ea8858c94c04758d5f849fa738532c7
Author: kl0u 
Date:   2016-04-24T21:19:11Z

Adding the map with the timestamp.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60924299
  
--- Diff: 
flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.api.common.functions.util.ListCollector
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap}
+import org.apache.flink.streaming.api.scala._
+import 
org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.util.{Collector, TestLogger}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import org.junit.Assert._
+import org.junit.Test
+
+class PatternStreamScalaJavaAPIInteroperabiliyTest extends TestLogger {
+
+
--- End diff --

two line breaks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60924213
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.cep.pattern.{FollowedByPattern => 
JFollowedByPattern, Pattern => JPattern}
+
+import scala.reflect.ClassTag
+
+package object pattern {
+  /**
+* Utility method to wrap [[org.apache.flink.cep.pattern.Pattern]] and 
its subclasses
+* for usage with the Scala API.
+*
+* @param javaPattern The underlying pattern from the Java API
+* @tparam T Base type of the elements appearing in the pattern
+* @tparam F Subtype of T to which the current pattern operator is 
constrained
+* @return A pattern from the Scala API which wraps the pattern from 
the Java API
+*/
+  private[flink] def wrapPattern[T: ClassTag, F <: T](javaPattern: 
JPattern[T, F])
--- End diff --

`ClassTag` should not be necessary if `FollowedByPattern` and `Pattern` are 
adapted accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3808) Refactor the whole file monitoring source to take a fileInputFormat as an argument.

2016-04-25 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-3808:
-

 Summary: Refactor the whole file monitoring source to take a 
fileInputFormat as an argument.
 Key: FLINK-3808
 URL: https://issues.apache.org/jira/browse/FLINK-3808
 Project: Flink
  Issue Type: Sub-task
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas


This issue is just an intermediate step towards making the file source 
fault-tolerant.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60924026
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.cep.pattern.{FollowedByPattern => 
JFollowedByPattern}
+
+import scala.reflect.ClassTag
+
+object FollowedByPattern {
+  /**
+* Constructs a new Pattern by wrapping a given Java API Pattern
+*
+* @param jfbPattern Underlying Java API Pattern.
+* @tparam T Base type of the elements appearing in the pattern
+* @tparam F Subtype of T to which the current pattern operator is 
constrained
+* @return New wrapping FollowedByPattern object
+*/
+  def apply[T: ClassTag, F <: T](jfbPattern: JFollowedByPattern[T, F]) =
--- End diff --

I think the `ClassTag` should not be necessary here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3708] Scala API for CEP (initial).

2016-04-25 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1905#discussion_r60923941
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+  * Base class for a pattern definition.
+  *
+  * A pattern definition is used by 
[[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create
+  * a [[org.apache.flink.cep.nfa.NFA]].
+  *
+  * {{{
+  * Pattern pattern = Pattern.begin("start")
+  * .next("middle").subtype(F.class)
+  * .followedBy("end").where(new MyFilterFunction());
+  * }
+  * }}}
+  *
+  * @param jPattern Underlying Java API Pattern
+  * @tparam T Base type of the elements appearing in the pattern
+  * @tparam F Subtype of T to which the current pattern operator is 
constrained
+  */
+class Pattern[T: ClassTag, F <: T](jPattern: JPattern[T, F]) {
--- End diff --

For what do we need the `ClassTag` here? I think it shouldn't be necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60923428
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -401,6 +400,52 @@ class Table(
 new Table(relBuilder.build(), tableEnv)
   }
 
+  /**
+* Sorts the given [[Table]]. Similar to SQL ORDER BY. The sorting is 
performed locally across
+* all partitions with keys equal to the given fields.
+*
+* Example:
+*
+* {{{
+*   tab.orderBy('name)
+* }}}
+*/
+  def orderBy(fields: Expression*): Table = {
+relBuilder.push(relNode)
+
+if (! fields.forall {
+  case x: UnresolvedFieldReference => true
+  case x@(_: Asc | _: Desc) => x.asInstanceOf[UnaryExpression]
--- End diff --

With a common base class of `Asc` and `Desc` this can be done nicer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60923473
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetSortRule.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.dataSet
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalSort
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetSort}
+
+class DataSetSortRule
+  extends ConverterRule(
+classOf[LogicalSort],
+Convention.NONE,
+DataSetConvention.INSTANCE,
+"FlinkSortRule") {
+  override def convert(rel: RelNode): RelNode = {
--- End diff --

Please a a new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2946] Add orderBy() to Table API

2016-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1926#discussion_r60923266
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+case class Asc(child: Expression) extends UnaryExpression {
--- End diff --

You add a common base class for orderings which `Asc` and `Desc` extend?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...

2016-04-25 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1895#issuecomment-214358848
  
I had some inline comments but overall the changes look good!

I think can simplify the `BinaryInputFormat` by getting rid of the 
`filePos` and `justReadAllRecords` fields and just snapshotting the `blockPos`. 
The `fieldPos` and `justReadAllRecords` information functionally depend on the 
`blockPos`, so storing the `filePos` and `justReadAllRecords` fields just adds 
more complexity since we're keeping track of all of them.

The snapshot would then just be `(blockPos, readRecords)`, upon restore the 
correct file read position can be derived from the block/split start position.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...

2016-04-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1895#discussion_r60920236
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.io.InputSplit;
+
+import java.io.IOException;
+import java.io.Serializable;
+
--- End diff --

Ah, I mean:
```
/**
 * Extension of {@link InputFormat} that allows checkpointing/restoring the 
current channel state.
 *
 * @param  The type of the produced records.
 * @param  The type of input split.
 * @param  The type of the channel state snapshot.
 */
public interface CheckpointableInputFormat extends InputFormat {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...

2016-04-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1895#discussion_r60919358
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.io.InputSplit;
+
+import java.io.IOException;
+import java.io.Serializable;
+
--- End diff --

We could add a comment here, something like "Extension of {@link 
InputFormat} that allows checkpointing/restoring the current channel state".

Also, I think we should make the signature match `InputFormat`, i.e.:
`public interface CheckpointableInputFormat 
extends InputFormat`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...

2016-04-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1895#discussion_r60916945
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -398,16 +401,20 @@ public FileBaseStatistics 
getStatistics(BaseStatistics cachedStats) throws IOExc
return null;
}
 
-   /**
-* Opens the given input split. This method opens the input stream to 
the specified file, allocates read buffers
-* and positions the stream at the correct position, making sure that 
any partial record at the beginning is skipped.
-* 
-* @param split The input split to open.
-* 
-* @see 
org.apache.flink.api.common.io.FileInputFormat#open(org.apache.flink.core.fs.FileInputSplit)
-*/
@Override
public void open(FileInputSplit split) throws IOException {
+   this.open(split, null);
+   }
+   /**
--- End diff --

Indentation is off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Add util for TDD instantiation

2016-04-25 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1890#issuecomment-214338353
  
I like this Chesnay. Definitely makes the tests more readable. Feel free to 
merge it if more people think this is useful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...

2016-04-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1895#discussion_r60915635
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java 
---
@@ -337,4 +402,18 @@ public int read(byte[] b, int off, int len) throws 
IOException {
return totalRead;
}
}
+
+   // 

--- End diff --

In most parts of the code we have dashes instead of equals signs, I think, 
like this:
```
// 
//  Checkpointing
// 
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: FLINK-3717 - Be able to re-start reading from ...

2016-04-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1895#discussion_r60914650
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java 
---
@@ -40,36 +41,46 @@
 import java.util.List;
 
 /**
- * Base class for all input formats that use blocks of fixed size. The 
input splits are aligned to these blocks. Without
- * configuration, these block sizes equal the native block sizes of the 
HDFS.
+ * Base class for all input formats that use blocks of fixed size. The 
input splits are aligned to these blocks,
+ * meaning that each split will consist of one block. Without 
configuration, these block sizes equal the native
+ * block sizes of the HDFS.
+ *
+ * A block will contain a {@link BlockInfo} at the end of the block. 
There, the reader can find some statistics
+ * about the split currently being read, that will help correctly parse 
the contents of the block.
  */
 @Public
-public abstract class BinaryInputFormat extends FileInputFormat {
+public abstract class BinaryInputFormat extends FileInputFormat
+   implements CheckpointableInputFormat> {
+
private static final long serialVersionUID = 1L;
 
-   /**
-* The log.
-*/
+   /** The log. */
private static final Logger LOG = 
LoggerFactory.getLogger(BinaryInputFormat.class);
 
-   /**
-* The config parameter which defines the fixed length of a record.
-*/
+   /** The config parameter which defines the fixed length of a record. */
public static final String BLOCK_SIZE_PARAMETER_KEY = 
"input.block_size";
 
public static final long NATIVE_BLOCK_SIZE = Long.MIN_VALUE;
 
-   /**
-* The block size to use.
-*/
+   /** The block size to use. */
private long blockSize = NATIVE_BLOCK_SIZE;
 
private transient DataInputViewStreamWrapper dataInputStream;
 
+   /** The BlockInfo for the Block corresponding to the split currently 
being read. */
private transient BlockInfo blockInfo;
 
+   /** A wrapper around the block currently being read. */
+   private transient BlockBasedInput blockBasedInput = null;
+
+   /**
+* The number of records already read from the block.
+* This is used to decide if the end of the block has been
+* reached.
+*/
private long readRecords;
 
+// private transient Tuple3 restoredState;
--- End diff --

Probably left over from the previous version of the code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-25 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-214332719
  
Thanks for the contribution @yjshen. I will also have a look at it tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >