[GitHub] flink pull request: [FLINK-1330] [build] Build creates a link in t...

2015-01-26 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/333#discussion_r23527422
  
--- Diff: flink-dist/pom.xml ---
@@ -436,6 +436,37 @@ under the License.
/gitDescribe
/configuration
/plugin
+
+   !-- create a symbolic link to the build target in the 
root directory --
+   plugin
+   groupIdcom.pyx4j/groupId
+   artifactIdmaven-junction-plugin/artifactId
+   version1.0.3/version
+   executions
+   execution
+   phasepackage/phase
+   goals
+   goallink/goal
+   /goals
+   /execution
+   execution
+   idunlink/id
+   phaseclean/phase
+   goals
+   goalunlink/goal
+   /goals
+   /execution
+   /executions
+   configuration
+   links
+   link
+   
dst${basedir}/../build-target/dst
--- End diff --

Isn't based deprecated? In the next line you use project.basedir.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1395] Add support for JodaTime in KryoS...

2015-01-26 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/304


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1396][FLINK-1303] Hadoop Input/Output d...

2015-02-04 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/363

[FLINK-1396][FLINK-1303] Hadoop Input/Output directly in API

This adds methods on ExecutionEnvironment for reading with Hadoop
Input/OutputFormat.

This also adds support in the Scala API for Hadoop Input/OutputFormats.

I also added tests and updated the documentation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink hadoop-in-api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/363.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #363


commit 94376ce914c740e9880bf161e90ae92a0ced39ed
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2015-01-28T14:13:30Z

[FLINK-1396][FLINK-1303] Hadoop Input/Output directly in API

This adds methods on ExecutionEnvironment for reading with Hadoop
Input/OutputFormat.

This also adds support in the Scala API for Hadoop Input/OutputFormats.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1463] Fix stateful/stateless Serializer...

2015-01-30 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/353

[FLINK-1463] Fix stateful/stateless Serializers and Comparators

Before, Serializers would announce whether they are stateful or not and
rely on RuntimeStatefulSerializerFactory to do the duplication.
Comparators, on the other hand, had a duplicate method that the user was
required to call.

This commit removes the statful/stateless property from Serializers but
instead introduces a duplicate() method, similar to Comparators, that
can return the same instance.

The two serializer factories are merged into one that always calls
duplicate() before returning a serializer.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink serializer-factories-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/353.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #353


commit 91834fee239b372b9a39f3c8f89ecbe42e2ae23a
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2015-01-30T15:43:31Z

[FLINK-1463] Fix stateful/stateless Serializers and Comparators

Before, Serializers would announce whether they are stateful or not and
rely on RuntimeStatefulSerializerFactory to do the duplication.
Comparators, on the other hand, had a duplicate method that the user was
required to call.

This commit removes the statful/stateless property from Serializers but
instead introduces a duplicate() method, similar to Comparators, that
can return the same instance.

The two serializer factories are merged into one that always calls
duplicate() before returning a serializer.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Add support for Subclasses, Interfaces, Abstra...

2015-02-02 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/236#issuecomment-72433217
  
Yes, it is not a good solution But what you propose isn't either: If we use 
Kryo for those subclasses that we cannot handle then nothing works anymore. The 
whole reason we have support for POJOs is that we can theoretically compare 
them in their binary representation. We are not doing this right now (the 
PojoComparator is always comparing in deserialised form) but we added it with 
that goal in mind.

If we don't want to do that anymore we can just get rid of the whole POJO 
serialisation code and use Kryo for everything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1458] Allow Interfaces and abstract typ...

2015-02-02 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/357

[FLINK-1458] Allow Interfaces and abstract types in TypeExtractor

Kryo already supports them, so it was just a question of the
TypeExtractor allowing them.

I also added tests for this.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
interfaces-generic-type-info-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/357.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #357


commit 6b1733178592c5c47145db165eb9c6797b156e19
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2015-02-02T15:08:18Z

[FLINK-1458] Allow Interfaces and abstract types in TypeExtractor

Kryo already supports them, so it was just a question of the
TypeExtractor allowing them.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1458] Allow Interfaces and abstract typ...

2015-02-03 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/357#issuecomment-72642370
  
You're right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1396][FLINK-1303] Hadoop Input/Output d...

2015-02-05 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/363#issuecomment-73040767
  
I think if executing it in an IDE the dependencies are not there. Since 
flink-java does not depend on flink-runtime, which has the hadoop dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1463] Fix stateful/stateless Serializer...

2015-02-05 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/353#issuecomment-73077279
  
Do you think that with the additional checking logic this would really make 
up for one superfluous duplication? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1463] Fix stateful/stateless Serializer...

2015-02-05 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/353#issuecomment-73079161
  
Ok, then I'll add this.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1458] Allow Interfaces and abstract typ...

2015-02-05 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/357


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1396][FLINK-1303] Hadoop Input/Output d...

2015-02-05 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/363#issuecomment-73020223
  
I addressed the comments. What do the others think about overloading 
readFile()? I made it like this on purpose. So that the user sees in the API 
that they are using Hadoop input formats or that they can be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Allow KeySelectors to implement ResultTypeQuer...

2015-02-03 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/354#issuecomment-72752435
  
Nope, sorry, also have no Idea why this is happening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1395] Add support for JodaTime in KryoS...

2015-01-14 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/304#discussion_r22968293
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 ---
@@ -99,6 +104,7 @@ public void testCopy() {

for (T datum : testData) {
T copy = serializer.copy(datum);
+   String str = copy.toString();
--- End diff --

Will change.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Add support for Subclasses, Interfaces, Abstra...

2015-01-15 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/236#issuecomment-70216890
  
No objections, your honour.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1395] Add support for JodaTime in KryoS...

2015-01-15 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/304#discussion_r22999053
  
--- Diff: flink-java/pom.xml ---
@@ -64,6 +64,18 @@ under the License.
version0.5.1/version
/dependency
 
+   dependency
--- End diff --

They are actually optional dependencies. They are not included unless we 
explicitly include them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1399] Add support for registering Seria...

2015-01-15 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/305#issuecomment-70065763
  
I added register methods at the ExecutionEnvironment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1395] Add support for JodaTime in KryoS...

2015-01-15 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/304#issuecomment-70063665
  
I added to LICENSE AND NOTICE and also addressed the other issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-947] Add a declarative expression API

2015-02-18 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/405#issuecomment-74842606
  
Yeah, I'm not sure about linq as well. I like the name but realise that it 
might be problematic. What do the others think. I could call it 
flink-expressions.

I will add documentation about which types are supported and a good error 
message for unsupported types as @rmetzger mentioned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1417] Automatically register types with...

2015-02-18 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/393#issuecomment-74843783
  
Yes please, go ahead. But if our Pojo stuff is really that slow we should 
think about how to improve that or remove it alltogether.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1417] Automatically register types with...

2015-02-17 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/393#issuecomment-74744132
  
What exactly are you running? TPC-H Query 3? Maybe we should test how fast 
Kryo would be with the PojoComparator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1417] Automatically register types with...

2015-02-16 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/393#discussion_r24749284
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -292,7 +360,76 @@ public void registerKryoType(Class? type) {
/**
 * Returns the registered POJO types.
 */
-   public SetClass? getRegisteredPojoTypes() {
+   public ListClass? getRegisteredPojoTypes() {
return registeredPojoTypes;
}
+
+
+   public boolean isDisableAutoTypeRegistration() {
+   return disableAutoTypeRegistration;
+   }
+
+   /**
+* Control whether Flink is automatically registering all types in the 
user programs with
+* Kryo.
+*
+* @param disableAutoTypeRegistration
+*/
+   public void setDisableAutoTypeRegistration(boolean 
disableAutoTypeRegistration) {
--- End diff --

I would prefer disableAutoTypeRegistration here. And then 
isAutoTypeRegistrationDisabled, above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-947] Add a declarative expression API

2015-02-16 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/405

[FLINK-947] Add a declarative expression API

This one is quite big. So you should check out the documentation, skaldic, 
examples and test cases to see how the API works.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink linq

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/405.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #405


commit 147525ced43db6690a64fbae1395dbd258b8901d
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2014-10-03T16:25:15Z

Change translateToDataflow to return Operator

Before, translateToDataflow of SingleInputOperator could only return
a single input operator of the lower layer, same for TwoInputOperator.

This change allows translateToDataflow to return more kinds of
operators.

commit 58b5b9ec6e65855bfd71287deb6352dfc4498451
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2014-10-23T16:09:38Z

Add methods to CompositeType for querying field types and names

commit ac29ee3ad36a72d7c41549f38da1a00e66d85041
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2014-10-01T11:12:18Z

[FLINK-947] Add a declarative expression API




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1422] Add withParameters() to documenta...

2015-01-29 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/350#issuecomment-72039131
  
+1 looks good to me


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1460] fix typos

2015-01-29 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/346#issuecomment-72036147
  
+1, can you merge it @hsaputra or should I?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Add support for Subclasses, Interfaces, Abstra...

2015-01-26 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/236#issuecomment-71436404
  
I will have to rework this now that the support for registering Types and 
Serializers at Kryo was merged.

The POJO subclass with tagging is slower because we do additional checks 
and lookups: Upon serialisation we perform a map lookup to check whether the 
subclass is actually a registered class. When deserialising we have to fetch 
the correct subclass serialiser from an array of subclass serialisers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1369] [types] Add support for Subclasse...

2015-01-26 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/316#issuecomment-71445361
  
It's almost the same, except for the change to handle Interfaces and 
Abstract Classes with GenericTypeInfo, correct?

The part that changes the KryoSerializer must be adapted because of my 
recently merged PR that allows registering types and serializers at Kryo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Add support for Subclasses, Interfaces, Abstra...

2015-02-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/236#discussion_r24403182
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -169,4 +186,113 @@ public ExecutionConfig disableObjectReuse() {
public boolean isObjectReuseEnabled() {
return objectReuse;
}
+
+   // 

+   //  Registry for types and serializers
+   // 

+
+   /**
+* Registers the given Serializer as a default serializer for the given 
type at the
--- End diff --

yes :dancers: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1463] Fix stateful/stateless Serializer...

2015-02-09 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/353#issuecomment-73501310
  
Manually merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a GroupC...

2015-03-18 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/466#issuecomment-82886862
  
I would say it's good to go now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Make Expression API available to Java, Rename ...

2015-03-20 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/503#issuecomment-83969916
  
Correct, that's why I'm doing a Pull Request. People can chime in here if 
they want.

Or should we continue the discussion on the mailing list? I though everyone 
would be more or less happy with Table.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-12 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/466#issuecomment-78101926
  
Sorry, I completely blanked, of course, You still need the grouping, only 
the shuffle step you don't need.

So, I suggest only better tests, using a combination of partitionByHash() 
and groupReducePartial().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/466#discussion_r26105008
  
--- Diff: 
flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReducePartialProperties.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.operators;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.compiler.dag.SingleInputNode;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.PartitioningProperty;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+public final class GroupReducePartialProperties extends 
OperatorDescriptorSingle {
+
+   private final Ordering ordering;// ordering that we need to use 
if an additional ordering is requested 
+
+   public GroupReducePartialProperties(FieldSet groupKeys, Ordering 
additionalOrderKeys) {
+   super(groupKeys);
+
+   // if we have an additional ordering, construct the ordering to 
have primarily the grouping fields
+   
+   this.ordering = new Ordering();
+   for (Integer key : this.keyList) {
+   this.ordering.appendOrdering(key, null, Order.ANY);
+   }
+
+   // and next the additional order fields
--- End diff --

indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/466#discussion_r26105136
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
 ---
@@ -156,6 +156,23 @@ public SortedGrouping(DataSetT set, KeysT keys, 
String field, Order order) {
return new GroupReduceOperatorT, R(this, resultType, 
dataSet.clean(reducer), Utils.getCallLocationName() );
}
 
+   /**
+* Applies a partial GroupReduce transformation on a grouped and sorted 
{@link DataSet}.
+*
+* In contrast to the reduceGroup transformation, the GroupReduce 
function is only called on each partition. Thus,
+* partial solutions are likely to occur.
+* @param reducer The ReduceFunction that is applied on the DataSet.
+* @return A GroupReducePartial operator which represents the partially 
reduced DataSet.
+*/
+   public R GroupReducePartialOperatorT, R 
reduceGroupPartially(GroupReduceFunctionT, R reducer) {
+   if (reducer == null) {
+   throw new NullPointerException(GroupReduce function 
must not be null.);
+   }
+   TypeInformationR resultType = 
TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType());
+
+   return new GroupReducePartialOperatorT, R(this, resultType, 
dataSet.clean(reducer), Utils.getCallLocationName());
+   }
+
--- End diff --

Why do we have this here. Partial GroupReduce doesn't make sense on a 
grouping. Thats's what a regular GroupReduce is. Or am I missing something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/466#discussion_r26105155
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 ---
@@ -159,7 +159,23 @@ public UnsortedGrouping(DataSetT set, KeysT keys) {
 
return new GroupReduceOperatorT, R(this, resultType, 
dataSet.clean(reducer), Utils.getCallLocationName());
}
-   
+
+   /**
+* Applies a partial GroupReduce transformation on a grouped {@link 
DataSet}.
+* In contrast to the reduceGroup transformation, the GroupReduce 
function is only called on each partition. Thus,
+* partial solutions are likely to occur.
+* @param reducer The ReduceFunction that is applied on the DataSet.
+* @return A GroupReducePartial operator which represents the partially 
reduced DataSet
+*/
+   public R GroupReducePartialOperatorT, R 
reduceGroupPartially(GroupReduceFunctionT, R reducer) {
+   if (reducer == null) {
+   throw new NullPointerException(GroupReduce function 
must not be null.);
+   }
+   TypeInformationR resultType = 
TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType());
+
+   return new GroupReducePartialOperatorT, R(this, resultType, 
dataSet.clean(reducer), Utils.getCallLocationName());
+   }
+
--- End diff --

Why do we have this here. Partial GroupReduce doesn't make sense on a 
grouping. Thats's what a regular GroupReduce is. Or am I missing something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-10 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/466#discussion_r26105226
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -355,6 +355,63 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
+   * Partial variant of the reduceGroup transformation which operates only 
on the individual
+   * partitions. This may lead to partially reduced results.
+   * Creates a new [[DataSet]] by passing for each group (elements with 
the same key) the list
+   * of elements to the group reduce function. The function must output 
one element. The
+   * concatenation of those will form the resulting [[DataSet]].
+   */
+  def reduceGroupPartially[R: TypeInformation: ClassTag](
+ fun: (Iterator[T]) = R): 
DataSet[R] = {
+Validate.notNull(fun, Group reduce function must not be null.)
+val reducer = new GroupReduceFunction[T, R] {
+  val cleanFun = set.clean(fun)
+  def reduce(in: java.lang.Iterable[T], out: Collector[R]) {
+out.collect(cleanFun(in.iterator().asScala))
+  }
+}
+wrap(
+  new GroupReducePartialOperator[T, R](maybeCreateSortedGrouping(),
+implicitly[TypeInformation[R]], reducer, getCallLocationName()))
+  }
+
+  /**
+   * Partial variant of the reduceGroup transformation which operates only 
on the individual
+   * partitions. This may lead to partially reduced results.
+   * Creates a new [[DataSet]] by passing for each group (elements with 
the same key) the list
+   * of elements to the group reduce function. The function can output 
zero or more elements using
+   * the [[Collector]]. The concatenation of the emitted values will form 
the resulting [[DataSet]].
+   */
+  def reduceGroupPartially[R: TypeInformation: ClassTag](
+  fun: (Iterator[T], Collector[R]) 
= Unit): DataSet[R] = {
+Validate.notNull(fun, Group reduce function must not be null.)
+val reducer = new GroupReduceFunction[T, R] {
+  val cleanFun = set.clean(fun)
+  def reduce(in: java.lang.Iterable[T], out: Collector[R]) {
+cleanFun(in.iterator().asScala, out)
+  }
+}
+wrap(
+  new GroupReducePartialOperator[T, R](maybeCreateSortedGrouping(),
+implicitly[TypeInformation[R]], reducer, getCallLocationName()))
+  }
+
+  /**
+   * Partial variant of the reduceGroup transformation which operates only 
on the individual
+   * partitions. This may lead to partially reduced results.
+   * Creates a new [[DataSet]] by passing for each group (elements with 
the same key) the list
+   * of elements to the [[GroupReduceFunction]]. The function can output 
zero or more elements. The
+   * concatenation of the emitted values will form the resulting 
[[DataSet]].
+   */
+  def reduceGroupPartially[R: TypeInformation: ClassTag](
+  reducer: GroupReduceFunction[T, R]): DataSet[R] = {
+Validate.notNull(reducer, GroupReduce function must not be null.)
+wrap(
+  new GroupReducePartialOperator[T, R](maybeCreateSortedGrouping(),
+implicitly[TypeInformation[R]], reducer, getCallLocationName()))
+  }
+
+  /**
--- End diff --

Again, why partial reduce on grouped dataset. That's what the regular 
GroupReduce is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...

2015-03-10 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/466#issuecomment-78016698
  
I like the implementation, except for my comments on groupReducePartial() 
on grouped DataSets. Also, the tests seem a bit shady because of all the 
grouping and regular reduceGroup operations. I would suggest partitioning the 
data using a manual partition operation and then applying a GroupReducePartial.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Make Expression API available to Java, Rename ...

2015-03-25 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/503#issuecomment-86112294
  
I fixed @rmetzger's remarks. Still waiting for a solution to the naming 
issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1769] Fix deploy bug caused by ScalaDoc...

2015-03-26 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/535#issuecomment-86416475
  
I just wanted to leave it sitting here for a while. But if no-one has any 
reservations I'll merge it today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1788] [table] Make logical plans transf...

2015-03-31 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/550

[FLINK-1788] [table] Make logical plans transformable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink flinq-mutable-plans

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/550.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #550


commit 6b9d8e7e76a563534927451c8c61707d71f51cca
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2015-03-26T13:45:43Z

[FLINK-1788] [table] Make logical plans transformable




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-03-03 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-76973194
  
Yes, this sounds good? Another thing: it has probably already come up but I 
just want to make sure, you implement CoGroup and Reduce the way you do because 
of performance, correct? That is, you don't do any work in the user code of a 
ReduceOperator but you do it in a chained MapPartition because there you get 
all the elements which makes communication with the python process more 
efficient. Same with CoGroup, where you implement your own grouping logic in 
python from the raw input streams.

Overall I like the architecture, the communication between the host and the 
guest language is well abstracted and I can see this being reused for other 
languages.

Could you rename the CoGroupPython* classes to something more generic? 
Because they really are a part of the generic language binding stuff and not 
specific to python, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-03-03 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-76981299
  
You could call it CoGroupRaw, just an idea...

Once that and the split into the python and generic part is done I vote for 
merging this. The API looks good and other stuff, such as getting rid of the 
type annotations can be worked on afterwards. I think it would be good to get 
people that are interested to try it out.

Also, the code is very well commented and documented. :smile_cat: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-03-02 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-76738603
  
I'm the next person to be looking at this. Hopefully wan can merge it after 
I've looked at it. :smile: 

@zentol Do you want to keep in in the current location or do you want to 
move it to flink-python?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-03-04 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-77127742
  
Thanks, I have two last requests, sorry for that.

Could you rename flink-generic to flink-language-binding-generic? The 
problem is, that the package name is now flink-generic, it pops up like this in 
maven central and so on without the information that it is actually a sub 
package of flink-language-binding. This could be quite confusing.

In MapFunctin.py and FilterFunction.py you use map() and filter() 
respectively. These operations are not lazy, i.e. in map() it does first apply 
the user map-function to every element in the partition and then it collects 
the results. This can become a problem if the input is very big. Instead we 
should iterate over the iterator and output each element after mapping. This 
keeps memory consumption low. Same applies to filter().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1769] Fix deploy bug caused by ScalaDoc...

2015-03-26 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/535


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Add support for Subclasses, Interfaces, Abstra...

2015-01-29 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/236#issuecomment-72161285
  
Then it fails at runtime, which makes me very uneasy. But then again, stuff 
can always fail at runtime when the user uses some strange subclass. Even more 
so without this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stream graph + internal refactor

2015-04-14 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/594#issuecomment-92854325
  
Regarding StreamRecord, is the UID still required? In my understanding we 
are working towards state-snapshotting, so that would not require to have IDs 
in records anymore, correct? I mentioned at some earlier point that the IDs are 
not used anymore, so what's the status now?

If we got rid of the UID then we can also get rid of StreamRecord and 
StreamRecord serializer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stream graph + internal refactor

2015-04-14 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/594#issuecomment-92856455
  
By the way, all these comments I'm making. I'm just making them to keep 
track of things. If we discuss them and want to implement some changes I can 
also do this myself, just want to get opinions here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-20 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-94443054
  
I just ran it on the cluster. Works like a charm. :smile: 

For word count, python takes 12 minutes, java about 2:40. But this should 
be expected, I guess.

Good to merge now, in my opinion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-20 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-94402978
  
I was referring to the way that communication is handled between the java 
host and the generic language client: Communication between them is not based 
on a fixed set of Messages (for example, messages defined using something like 
Protobuf or Avro) but instead the knowledge about how messages are structured 
is implicit in the code that does the messaging. So the java side expects a 
sequence of primitives (integers, strings) in a certain order and the python 
side knows that order and sends them in this order.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-20 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-94400959
  
I'll test it again on a cluster. Could you please elaborate a bit. Is the 
timeout still in? Communication is through TCP instead of the mapped files. but 
still with the same basic interface of writing basic values for communication?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1799][scala] Fix handling of generic ar...

2015-04-20 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/582#issuecomment-94392757
  
Any more thoughts? Otherwise I would like to merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1799][scala] Fix handling of generic ar...

2015-04-20 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/582#discussion_r28692599
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
 ---
@@ -143,6 +143,18 @@ else if (type instanceof Class?  ((Class?) 
type).isArray()
throw new InvalidTypesException(The given type is not a valid 
object array.);
}
 
+   /**
+* Creates a new {@link 
org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo} from a
+* {@link TypeInformation} for the component type.
+*
+* p
+* This must be used in cases where the complete type of the array is 
not available as a
+* {@link java.lang.reflect.Type} or {@link java.lang.Class}.
+*/
+   public static T, C ObjectArrayTypeInfoT, C 
getInfoFor(TypeInformationC componentInfo) {
+   return new ObjectArrayTypeInfoT, C(Object[].class, 
componentInfo.getTypeClass(), componentInfo);
--- End diff --

That's right, I didn't think of that. Will change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...

2015-04-20 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/612

[FLINK-1867/1880] Raise test timeouts in hope of fixing Travis fails



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink raise-test-timeouts

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/612.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #612


commit 4df27ee0ec1ce68376d51c4a882116651fd52788
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2015-04-13T14:16:51Z

[FLINK-1867/1880] Raise test timeouts in hope of fixing Travis fails




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...

2015-04-21 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/612#issuecomment-94799019
  
Any thoughts on this? I would really like to merge this to improve Travis 
reliability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-21 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-94798439
  
I merged it. :smile: 

Thanks a lot @zentol for staying with this for so long. Great work!

P.S. Could you please close this PR, I always forget adding the closes 
#... message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1799][scala] Fix handling of generic ar...

2015-04-21 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/582#issuecomment-94799103
  
So, any thoughts about merging this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-24 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-95846017
  
The problem is, that I can't see it in the github interface. On what branch 
are your changes? Could you please rebase them on top of the current master?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1789] [core] [runtime] [java-api] Allow...

2015-04-24 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/593#issuecomment-95845500
  
OK, @StephanEwen, any thoughts on this? Should we allow that the local user 
code class loader in the client potentially doesn't have the same jars 
available as the workers?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...

2015-04-24 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/612#issuecomment-95946066
  
Manually merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...

2015-04-24 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/612


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-24 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/621#issuecomment-95926348
  
The build still fails because of missing license headers in the model 
package.

By the way, did you write the files in the model package yourself or were 
they generated?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-95570894
  
Where is your git repository? So that I can checkout your commit and merge 
it?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...

2015-04-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/612#discussion_r28943704
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 ---
@@ -18,24 +18,23 @@
 
 package org.apache.flink.api.scala.runtime.jobmanager
 
-import akka.actor.Status.{Success, Failure}
+import akka.actor.Status.Success
 import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
+import org.junit.Ignore
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.JobExecutionException
-import org.apache.flink.runtime.jobgraph.{JobGraph, AbstractJobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks.{NoOpInvokable, 
BlockingNoOpInvokable}
+import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, JobGraph}
+import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, 
NoOpInvokable}
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import 
org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
-import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated,
-NotifyWhenJobManagerTerminated}
+import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated,
 NotifyWhenJobManagerTerminated}
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.apache.flink.test.util.ForkableFlinkMiniCluster
-import org.junit.Ignore
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
 @Ignore(Contains a bug with Akka 2.2.1)
--- End diff --

True, removing it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...

2015-04-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/612#discussion_r28943849
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
 ---
@@ -112,9 +112,9 @@ public void testTaskManagerProcessFailure() {
Tuple2String, Object localAddress = new 
Tuple2String, Object(localhost, jobManagerPort);
 
Configuration jmConfig = new Configuration();
-   
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 s);
-   
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4 s);
-   
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
+   
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 ms);
--- End diff --

Seemed to run flawlessly, though. :smile: I can the tests about 100 times 
by now without seeing another failure in the targeted tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...

2015-04-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/612#discussion_r28943875
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
 ---
@@ -112,9 +112,9 @@ public void testTaskManagerProcessFailure() {
Tuple2String, Object localAddress = new 
Tuple2String, Object(localhost, jobManagerPort);
 
Configuration jmConfig = new Configuration();
-   
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 s);
-   
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4 s);
-   
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
+   
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 ms);
+   
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 20 s);
+   
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 20);
--- End diff --

See above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...

2015-04-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/612#discussion_r28943824
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
 ---
@@ -112,9 +112,9 @@ public void testTaskManagerProcessFailure() {
Tuple2String, Object localAddress = new 
Tuple2String, Object(localhost, jobManagerPort);
 
Configuration jmConfig = new Configuration();
-   
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 s);
-   
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4 s);
-   
jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
+   
jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 ms);
--- End diff --

That's a typo, will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...

2015-04-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/612#discussion_r28943731
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 ---
@@ -136,9 +135,9 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
 config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlots)
 
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTaskmanagers)
 config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1000 
ms)
-config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4000 ms)
+config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 20 s)
--- End diff --

As above, it doesn't affect test execution time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1789] [core] [runtime] [java-api] Allow...

2015-04-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/593#issuecomment-95500680
  
This does not work if the user uses classes that are not available on the 
local machine since you don't add the additional class path entries in 
JobWithJars.buildUserCodeClassLoader(). Correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/442#issuecomment-95511047
  
This looks good to merge. Any objections?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...

2015-04-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/612#discussion_r28943598
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
 ---
@@ -231,9 +231,9 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
 config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlots)
 
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTaskmanagers)
 config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1000 
ms)
-config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4000 ms)
+config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 20 s)
--- End diff --

No, it doesn't before and after this change the 4 tests complete in about 8 
seconds. @tillrohrmann suggested that, since the actor system is local, there 
are some other mechanisms in play that signal failure in this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1472] Fixed Web frontend config overvie...

2015-04-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/439#issuecomment-95517673
  
Hi,
sorry for the long wait on this. I really like the feature but the 
implementation is not scalable: If new config values are added this needs to be 
updated in several places now.

Could you change ConfigConstants and add a static initializer block that 
builds the hash maps that you manually build in DefaultConfigKeyValues using 
reflection. The code would just need to loop through all fields that have _KEY 
at the end, and then find the matching default value without the _KEY at the 
end. From the default value field the type of the value can be determined and 
it can be added to the appropriate hash map. This way, the defaults will always 
stay up to date with the actual config constants.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1789] [core] [runtime] [java-api] Allow...

2015-04-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/593#issuecomment-95564844
  
Yes, this is true, but the way it is implemented, the folders are not 
always added to the class loader. Maybe I'm wrong here, but 
JobWithJars.getUserCodeClassLoader and JobWithJars.buildUserCodeClassLoader 
don't add the URLs to the ClassLoader that they create.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1398] Introduce extractSingleField() in...

2015-04-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/308#issuecomment-95566780
  
Yes, I think we should start a discussion there. I just wanted to give the 
reasons for my opinion here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1472] Fixed Web frontend config overvie...

2015-04-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/439#issuecomment-95580562
  
Yes, but then we should change this now and not build more code on top of 
this that can fail in the future if someone forgets to add the names to the 
correct hash set in some other class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1472] Fixed Web frontend config overvie...

2015-04-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/439#issuecomment-95581307
  
I added a Jira for this: https://issues.apache.org/jira/browse/FLINK-1936


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-924] Add automatic dependency retrieval...

2015-04-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/35#issuecomment-95590430
  
Hi @qmlmoon,
sorry for the long wait on this PR. Could you please rebase on top of the 
current master and also get rid of the merge commits in the process?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...

2015-04-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/612#discussion_r28961799
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
 ---
@@ -231,9 +231,9 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
 config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlots)
 
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTaskmanagers)
 config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1000 
ms)
-config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4000 ms)
+config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 20 s)
--- End diff --

Yes, removing them also from JobManagerFailsITCase


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1472] Fixed Web frontend config overvie...

2015-04-23 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/439#issuecomment-95561718
  
But then I think the solution is to normalise the constants in 
ConfigConstants.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1799][scala] Fix handling of generic ar...

2015-04-20 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/582#issuecomment-94483743
  
I fixed @StephanEwen's complaint. It was incorrect but the type class of 
TypeInformation does not seem to be used in any places where it matters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework

2015-04-29 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/638#issuecomment-97412959
  
But doesn't this mean that the lambdas now must be stateless, i.e. if a 
user refers to some variable outside the lambda this will not be serialised 
with the closure anymore (because there is no serialization of the closure 
anymore).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-29 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/621#issuecomment-97401561
  
Our travis builds are a bit unstable right now. I'm running some last tests 
and then I'll merge this. Thanks for staying with this and working on my 
requests! :smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1924] Minor Refactoring

2015-04-27 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/616#issuecomment-96573887
  
+1, can you merge it @mxm 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-27 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/621#issuecomment-96565454
  
The tests are failing because you use spaces in you code for indentation. 
Could you please change all indentation to tabs to satisfy the style checker?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1789] [core] [runtime] [java-api] Allow...

2015-04-27 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/593#issuecomment-96532907
  
Yes, this would make things a lot cleaner. @twalthr what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-924] Add automatic dependency retrieval...

2015-04-28 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/35#issuecomment-96960370
  
Thanks for working with this on me. Very nice contribution. :smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [streaming] New Source and state checkpointing...

2015-05-04 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/643#issuecomment-98690522
  
Also, the key would be a property of a DataStream, and other operations 
could also use this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat

2015-04-28 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/621#issuecomment-96953776
  
Unfortunately, Travis cuts of the log if it is too long, like here: 
https://travis-ci.org/aljoscha/flink/jobs/60177866 (that's from your pull 
request). You have to click on the Download log button, then you can view the 
whole log, there you see the check style errors.

You can also see the errors if you run a mvn clean verify on your local 
machine.

It seems your code still contains tabs. (From the recently failed travis 
builds)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1978] Fix POJO deserialization copy f...

2015-05-07 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/655#issuecomment-99757603
  
Looks good. Can you go ahead and merge it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...

2015-05-07 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/659

[FLINK-1977] Rework Stream Operators to always be push based



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink event-time

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/659.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #659


commit 2e2abbd4afb6f058abb7b5d4a287349cdef5d8f6
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2015-05-04T13:53:36Z

[FLINK-1977] Rework Stream Operators to always be push based

commit 58843949102c0506b30c80cfb36065d303a64cda
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2015-05-06T16:11:52Z

Change StreamOperator to be an Interface

This introduces new AbstractOperator classes for UDF operators and
operators without UDFs.

commit 4e0fadd4c4069817362382dbe8c7287f3b0150ff
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2015-05-07T07:39:54Z

Change StreamOperator to take Output instead of Collector

Output is a richer Interface that extends Collector. Right now it does
not contain additional methods. But this will be extended to support
forwarding of Low Watermarks/Barriers.

commit f0d2c3bb47a63ed50f543732665a8b88872d839d
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2015-05-07T09:06:52Z

Simplify StreamTask and derived classes

commit a4be5138c6262e104d83263d6e4800e416d6fd4a
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2015-05-07T09:19:44Z

Remove unused imports




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [hotfix][scala] Let type analysis work on some...

2015-05-07 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/660

[hotfix][scala] Let type analysis work on some Java types

This is mostly to allow Vertex, which is a Java Tuple2 to be used in
the Scala Gelly API.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink scala-type-analysis-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/660.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #660


commit 6c382dccc6ddc825f348aa9dc91bf64d857f400e
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2015-05-07T12:27:33Z

[hotfix][scala] Let type analysis work on some Java types

This is mostly to allow Vertex, which is a Java Tuple2 to be used in
the Scala Gelly API.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...

2015-05-08 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/659#issuecomment-100144884
  
I think this would not solve our problems. I will start a discussion thread 
on the dev list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...

2015-05-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/659#discussion_r29851342
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
 ---
@@ -64,6 +64,10 @@
 
private volatile boolean isRunning = false;
--- End diff --

Correct, removing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...

2015-05-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/659#discussion_r29851632
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 ---
@@ -36,9 +36,9 @@
/**
 * Main work method of the source. This function is invoked at the 
beginning of the
--- End diff --

Oversight on my part, and I forgot to rework the FileMonitoringFunction (a 
source)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...

2015-05-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/659#discussion_r29854565
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
 ---
@@ -1,149 +1,149 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.flume;
-
-import java.util.List;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.Collector;
-import org.apache.flume.Context;
-import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.source.AvroSource;
-import org.apache.flume.source.avro.AvroFlumeEvent;
-import org.apache.flume.source.avro.Status;
-
-public class FlumeSourceOUT extends ConnectorSourceOUT {
-   private static final long serialVersionUID = 1L;
-
-   String host;
-   String port;
-   volatile boolean finished = false;
-
-   private volatile boolean isRunning = false;
-
-   FlumeSource(String host, int port, DeserializationSchemaOUT 
deserializationSchema) {
-   super(deserializationSchema);
-   this.host = host;
-   this.port = Integer.toString(port);
-   }
-
-   public class MyAvroSource extends AvroSource {
-   CollectorOUT collector;
-
-   /**
-* Sends the AvroFlumeEvent from it's argument list to the 
Apache Flink
-* {@link DataStream}.
-* 
-* @param avroEvent
-*The event that should be sent to the dataStream
-* @return A {@link Status}.OK message if sending the event was
-* successful.
-*/
-   @Override
-   public Status append(AvroFlumeEvent avroEvent) {
-   collect(avroEvent);
-   return Status.OK;
-   }
-
-   /**
-* Sends the AvroFlumeEvents from it's argument list to the 
Apache Flink
-* {@link DataStream}.
-* 
-* @param events
-*The events that is sent to the dataStream
-* @return A Status.OK message if sending the events was 
successful.
-*/
-   @Override
-   public Status appendBatch(ListAvroFlumeEvent events) {
-   for (AvroFlumeEvent avroEvent : events) {
-   collect(avroEvent);
-   }
-
-   return Status.OK;
-   }
-
-   /**
-* Deserializes the AvroFlumeEvent before sending it to the 
Apache Flink
-* {@link DataStream}.
-* 
-* @param avroEvent
-*The event that is sent to the dataStream
-*/
-   private void collect(AvroFlumeEvent avroEvent) {
-   byte[] b = avroEvent.getBody().array();
-   OUT out = FlumeSource.this.schema.deserialize(b);
-
-   if (schema.isEndOfStream(out)) {
-   FlumeSource.this.finished = true;
-   this.stop();
-   FlumeSource.this.notifyAll();
-   } else {
-   collector.collect(out);
-   }
-
-   }
-
-   }
-
-   MyAvroSource avroSource;
-
-   /**
-* Configures the AvroSource. Also sets the collector so the 
application can
-* use it from outside of the invoke function.
-* 
-* @param collector
-*The collector used in the invoke function
-*/
-   public void configureAvroSource(CollectorOUT

[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...

2015-05-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/659#discussion_r29855036
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
 ---
@@ -183,6 +173,16 @@ public void 
open(org.apache.flink.configuration.Configuration parameters) throws
}
}
 
+   @Override
+   public void close() throws Exception {
+   super.close();
+   if (activePolicyThread != null) {
+   activePolicyThread.interrupt();
+   }
+
+   emitWindow();
--- End diff --

I think it's fine in streaming. open(), receiveElement(), close() are all 
called in the inner loop of StreamTask.invoke()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...

2015-05-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/659#discussion_r29855250
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class OneInputStreamTaskIN, OUT extends StreamTaskOUT, 
OneInputStreamOperatorIN, OUT {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OneInputStreamTask.class);
+
+   protected StreamRecordSerializerIN inSerializer;
+   protected IndexedReaderIteratorStreamRecordIN recordIterator;
+
+
+   @Override
+   public void registerInputOutput() {
+   super.registerInputOutput();
+   InputHandlerIN inputHandler = new InputHandlerIN(this);
+   inSerializer = inputHandler.getInputSerializer();
+   recordIterator = inputHandler.getInputIter();
+   }
+
+   /*
+* Reads the next record from the reader iterator and stores it in the
+* nextRecord variable
+*/
+   protected StreamRecordIN readNext() throws IOException {
+   StreamRecordIN nextRecord = inSerializer.createInstance();
+   try {
+   return recordIterator.next(nextRecord);
+   } catch (IOException e) {
+   if (isRunning) {
+   throw new RuntimeException(Could not read next 
record due to: 
+   + 
StringUtils.stringifyException(e));
+   } else {
+   // Task already cancelled do nothing
+   return null;
+   }
+   } catch (IllegalStateException e) {
+   if (isRunning) {
+   throw new RuntimeException(Could not read next 
record due to: 
+   + 
StringUtils.stringifyException(e));
+   } else {
+   // Task already cancelled do nothing
+   return null;
+   }
+   }
+   }
+
+   @Override
+   public void invoke() throws Exception {
+   this.isRunning = true;
+
+   boolean operatorOpen = false;
+
+   if (LOG.isDebugEnabled()) {
+   LOG.debug(Task {} invoked, getName());
+   }
+
+   try {
+   openOperator();
+   operatorOpen = true;
+
+   StreamRecordIN nextRecord;
+   while (isRunning  (nextRecord = readNext()) != null) {
+   
streamOperator.receiveElement(nextRecord.getObject());
+   }
+
+   closeOperator();
+   operatorOpen = false;
+
+   if (LOG.isDebugEnabled()) {
+   LOG.debug(Task {} invocation finished, 
getName());
+   }
+
+   } catch (Exception e) {
+
+   if (operatorOpen) {
+   try {
+   closeOperator();
+   } catch (Throwable t) {
+   // TODO: why are we not doing anything 
here?
--- End diff --

Seems reasonable.


---
If your project is set up

[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...

2015-05-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/659#discussion_r29854531
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
 ---
@@ -165,11 +165,10 @@ private void decorateNode(Integer vertexID, 
JSONObject node) throws JSONExceptio
node.put(PACT, Data Stream);
}
 
-   StreamOperator?, ? operator = 
streamGraph.getStreamNode(vertexID).getOperator();
+   StreamOperator? operator = 
streamGraph.getStreamNode(vertexID).getOperator();
 
-   if (operator != null  operator.getUserFunction() != null) {
-   node.put(CONTENTS, vertex.getOperatorName() +  at 
-   + 
operator.getUserFunction().getClass().getSimpleName());
+   if (operator != null) {
+   node.put(CONTENTS, vertex.getOperatorName());
} else {
node.put(CONTENTS, vertex.getOperatorName());
}
--- End diff --

Changing



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...

2015-05-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/659#discussion_r29854951
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
 ---
@@ -60,29 +62,45 @@ public void run() throws Exception {
groupedDiscretizers.put(key, 
groupDiscretizer);
}
 
-   groupDiscretizer.processRealElement(nextObject);
+   groupDiscretizer.processRealElement(element);
}
 
-   }
 
-   for (StreamDiscretizerIN group : 
groupedDiscretizers.values()) {
-   group.emitWindow();
-   }
+
 
}
 
@Override
public void open(org.apache.flink.configuration.Configuration 
parameters) throws Exception {
super.open(parameters);
-   centralThread = new Thread(new CentralCheck());
+   centralCheck = new CentralCheck();
+   centralThread = new Thread(centralCheck);
centralThread.start();
}
 
+   @Override
+   public void close() throws Exception {
+   super.close();
+   for (StreamDiscretizerIN group : 
groupedDiscretizers.values()) {
+   group.emitWindow();
+   }
+
+   try {
+   centralCheck.running = false;
+   centralThread.interrupt();
+   centralThread.join();
+   } catch (InterruptedException e) {
+   e.printStackTrace();
--- End diff --

But what do you want me to do?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...

2015-05-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/659#discussion_r29855135
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class OneInputStreamTaskIN, OUT extends StreamTaskOUT, 
OneInputStreamOperatorIN, OUT {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OneInputStreamTask.class);
+
+   protected StreamRecordSerializerIN inSerializer;
+   protected IndexedReaderIteratorStreamRecordIN recordIterator;
+
+
+   @Override
+   public void registerInputOutput() {
+   super.registerInputOutput();
+   InputHandlerIN inputHandler = new InputHandlerIN(this);
+   inSerializer = inputHandler.getInputSerializer();
+   recordIterator = inputHandler.getInputIter();
+   }
+
+   /*
+* Reads the next record from the reader iterator and stores it in the
+* nextRecord variable
+*/
+   protected StreamRecordIN readNext() throws IOException {
+   StreamRecordIN nextRecord = inSerializer.createInstance();
+   try {
+   return recordIterator.next(nextRecord);
+   } catch (IOException e) {
+   if (isRunning) {
+   throw new RuntimeException(Could not read next 
record due to: 
+   + 
StringUtils.stringifyException(e));
--- End diff --

Working on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   5   6   7   8   9   10   >