[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-26 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71489135
  
but that is exactly what is changing, both the delete and copy process are 
synchronized on the same object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-26 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71490079
  
oh i see what you mean, maybe extend the synchronized block to include the 
actual delete stuff. yup that's a good idea, all i know is i tried it without 
the change and ran into issues, with the change it ran.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-03 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-72652471
  
hmm...you are now the second person to report that creating the tmp files 
does not work on OS X. i don't know why that doesn't work. the file creation is 
done from java, is there any magic required there? i can't debug OS X error 
myself at the moment. all i can do on that front is add sanity checks for 
better error reporting.

the included triangle enumeration is kinda odd, even if it runs the output 
is empty; Ive already checked the implementation yesterday and it appears equal 
to the java counterpart. will give it another go.

the plan execution is one of the more fragile parts. generally, when the 
process exits with an error it is noticed. but if for example something is 
missing (like the call to execute) things just get stuck. this is due to the 
fact that information is only ever sent to java, but never received, a complete 
one-way street. since accumulators nor actions were supposed to be implemented 
anytime soon this seemed appropriate, but it seems that requires a change 
already. some timeouts could be useful as well.

@fhueske Thanks for trying it out!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/202#discussion_r23955578
  
--- Diff: docs/python_programming_guide.md ---
@@ -0,0 +1,600 @@
+---
+title: Python Programming Guide
+---
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+
+* This will be replaced by the TOC
+{:toc}
+
+
+a href=#top/a
+
+Introduction
+
+
+Analysis programs in Flink are regular programs that implement 
transformations on data sets
+(e.g., filtering, mapping, joining, grouping). The data sets are initially 
created from certain
+sources (e.g., by reading files, or from collections). Results are 
returned via sinks, which may for
+example write the data to (distributed) files, or to standard output (for 
example the command line
+terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+In order to create your own Flink program, we encourage you to start with 
the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as 
references for additional
+operations and advanced features.
+
+
+Example Program
+---
+
+The following program is a complete, working example of WordCount. You can 
copy amp; paste the code
+to run it locally.
+
+{% highlight python %}
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import INT, STRING
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+
+class Adder(GroupReduceFunction):
+  def reduce(self, iterator, collector):
+count, word = iterator.next()
+count += sum([x[0] for x in iterator])
+collector.collect((count, word))
+
+if __name__ == __main__:
+  env = get_environment()
+  data = env.from_elements(Who's there?,
+   I think I hear them. Stand, ho! Who's there?)
+  
+  data \
+.flat_map(lambda x: x.lower().split(), (INT, STRING)) \
+.group_by(1) \
+.reduce_group(Adder(), (INT, STRING), combinable=True) \
+.output()
+  
+  env.execute()
+}
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/202#discussion_r23958310
  
--- Diff: 
flink-addons/flink-language-binding/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
 ---
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the License); you may 
not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.flink.languagebinding.api.java.common;
+
+/**
+ * Container for all generic information related to operations. This class 
contains the absolute minimum fields that are
+ * required for all operations. This class should be extended to contain 
any additional fields required on a
+ * per-language basis.
+ */
+public abstract class OperationInfo {
+   public int parentID; //DataSet that an operation is applied on
+   public int otherID; //secondary DataSet
+   public int setID; //ID for new DataSet
+   public int[] keys1; //grouping keys
+   public int[] keys2; //grouping keys
+   public int[] projectionKeys1; //projection keys
+   public int[] projectionKeys2; //projection keys
+   public Object types; //an object that is of the same type as the output 
type
--- End diff --

transient field is a good idea...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/202#discussion_r23954389
  
--- Diff: 
flink-addons/flink-language-binding/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
 ---
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the License); you may 
not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.flink.languagebinding.api.java.common;
+
+/**
+ * Container for all generic information related to operations. This class 
contains the absolute minimum fields that are
+ * required for all operations. This class should be extended to contain 
any additional fields required on a
+ * per-language basis.
+ */
+public abstract class OperationInfo {
+   public int parentID; //DataSet that an operation is applied on
+   public int otherID; //secondary DataSet
+   public int setID; //ID for new DataSet
+   public int[] keys1; //grouping keys
+   public int[] keys2; //grouping keys
+   public int[] projectionKeys1; //projection keys
+   public int[] projectionKeys2; //projection keys
+   public Object types; //an object that is of the same type as the output 
type
--- End diff --

yes that would be nicer, but last time i tried that i got 
NotSerializableException due to the TypeInformation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-02 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-72537053
  
hard to find error messages what do you mean by that?

what did you run the job with (data size, dop) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-03 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-72744279
  
about error messages going to command-line: the only way i see for that to 
work is by wrapping the complete error message into an exception, since they do 
show up on the command-line.

wc deadlock: i just can't reproduce it. i tried small files (4 words) and 
went up to 750mb with dop=1. can you send me the test data you used?

@qmlmoon THANK YOU! that would have taken me ages to figure out. working on 
a fix right now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1422] Add withParameters() to documenta...

2015-02-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/350#discussion_r24168173
  
--- Diff: docs/programming_guide.md ---
@@ -2398,6 +2399,61 @@ of a function, or use the `withParameters(...)` 
method to pass in a configuratio
 [Back to top](#top)
 
 
+Passing parameters to functions
+---
+
+Parameters can be passed to rich functions using either the constructor 
(if the function is
--- End diff --

ok, so which requirements do exist for the operator to be serializable? or 
can i just omit those details and say something like parameters can be stored 
in non-transient fields if the function is serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-785] Chained AllReduce

2015-02-06 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/370

[FLINK-785] Chained AllReduce

This a a preliminary PR to see whether I'm on the right track.

I'm wondering whether this would be everything needed to add a Chained 
AllReduce, before i continue with this issue. I tried it out and it appears to 
work, but wanted to make sure.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/incubator-flink chained_all_reduce

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/370.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #370


commit 37b77670b517995498f83df452ed5b20754fc63e
Author: zentol s.mo...@web.de
Date:   2015-02-06T13:38:05Z

[FLINK-785] Chained AllReduce




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-785] Chained AllReduce

2015-02-08 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/370#issuecomment-73404704
  
there's something funky going on with the tests here.

i got 2 failing tests in ObjectReuseITCase:
```

ObjectReuseITCaseJavaProgramTestBase.testJobWithoutObjectReuse:168-postSubmit:68-TestBaseUtils.compareResultsByLinesInMemory:223-TestBaseUtils.compareResultsByLinesInMemory:238
 arrays first differed at element [0]; expected:a,[10]0 but was:a,[6]0


ObjectReuseITCaseJavaProgramTestBase.testJobWithObjectReuse:120-postSubmit:68-TestBaseUtils.compareResultsByLinesInMemory:223-TestBaseUtils.compareResultsByLinesInMemory:238
 arrays first differed at element [0]; expected:a,[10]0 but was:a,[6]0
```
These two tests verify the wrong behaviour that occurs when object reuse is 
enabled but not accounted for. i thought this was generally treated as 
undefined behaviour, why are there tests for that?

the other 2 tests fail with NullPointerException when accessing the 
expected result.
```

ClosureCleanerITCase.after:52-TestBaseUtils.compareResultsByLinesInMemory:223-TestBaseUtils.compareResultsByLinesInMemory:234
 » NullPointer

ClosureCleanerITCase.after:52-TestBaseUtils.compareResultsByLinesInMemory:223-TestBaseUtils.compareResultsByLinesInMemory:234
 » NullPointer
```
i can't figure out why this occurs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-03 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-72752968
  
@qmlmoon sweet. 

@rmetzger errors should show up on the console now. and in the .out file. 
and i suppose by extension in the .log file aswell.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1398] Introduce extractSingleField() in...

2015-01-18 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/308#issuecomment-70413423
  
@StephanEwen would that look like this?
```java
reverse(dataset.map(...)).filter(...)
```

Ive been thinking about the API overload issue a bit: could a DataSet have 
a field that offered more functions? something like:
```java
dataset.map().ext.reverse().filter()
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-785] Chained AllReduce

2015-02-11 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/370#issuecomment-73923184
  
sorry for not making it clear that i forced pushed some changes, forgot 
making a separate branch.

the changes you made to the driver are already in, sorry for wasting your 
time there.
i will include the changes to the test case.

The ChainedAllReduceDriver currently does not cause any problems.
I've added a ChainedAllGroupReduceCombineDriver aswell, which fails for one 
test case
```

ReplicatingDataSourceITCase.after:69-TestBaseUtils.compareResultsByLinesInMemory:223-TestBaseUtils.compareResultsByLinesInMemory:238
 arrays first differed at element [0]; expected:([50050]0) but 
was:([37525]0)
```
may be related to FLINK-1521, once the newest test is complete I'll 
hopefully have a definite answer. when i force objectReuse in the global reduce 
all tests pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-11 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-73936205
  
in my case, it returns this:
```python
 import socket
 socket.gethostname()
'Linux'
 socket.getaddrinfo(socket.gethostname(), None, socket.AF_INET, 
socket.SOCK_DGRAM)
[(2, 2, 17, '', ('127.0.1.1', 0))]
```

127.0.1.1 is also present in my /etc/hosts

i assume they are different because no error is printed. I tried several 
different approaches, and when i got no (python) error and only the timeout, 
further investigation showed that the resolved ip did not match 127.0.1.1.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-11 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-73945405
  
@mxm I've added another potential fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-10 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-73731632
  
if you don't mind, try the example again and let it run. both processes 
should timeout after 5 minutes throwing exceptions, hopefully pointing to the 
origin of the lock.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-785] Chained AllReduce / AllGroupReduce...

2015-02-12 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/370#discussion_r24633778
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllGroupReduceCombineDriver.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.chaining;
+
+import java.util.ArrayList;
+
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.RegularPactTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChainedAllGroupReduceCombineDriverT extends 
ChainedDriverT, T {
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedAllGroupReduceCombineDriver.class);
+
+   // 

+   private FlatCombineFunctionT combiner;
+   private TypeSerializerT serializer;
+
+   private volatile boolean running = true;
+
+   private final ArrayListT values = new ArrayList();
+
+   // 

+   @Override
+   public void setup(AbstractInvokable parent) {
+   @SuppressWarnings(unchecked)
+   final FlatCombineFunctionT com = 
RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, 
FlatCombineFunction.class);
+   this.combiner = com;
+   FunctionUtils.setFunctionRuntimeContext(com, 
getUdfRuntimeContext());
+
+   this.objectReuseEnabled = 
this.executionConfig.isObjectReuseEnabled();
+
+   final TypeSerializerFactoryT serializerFactory = 
this.config.getInputSerializer(0, this.userCodeClassLoader);
+   this.serializer = serializerFactory.getSerializer();
+
+   if (LOG.isDebugEnabled()) {
+   LOG.debug(ChainedAllGroupReduceCombineDriver object 
reuse:  + (this.objectReuseEnabled ? ENABLED : DISABLED) + .);
+   }
+   }
+
+   @Override
+   public void openTask() throws Exception {
+   final Configuration stubConfig = 
this.config.getStubParameters();
+   RegularPactTask.openUserCode(this.combiner, stubConfig);
+   }
+
+   @Override
+   public void closeTask() throws Exception {
+   if (!this.running) {
+   return;
+   }
+   RegularPactTask.closeUserCode(this.combiner);
+   }
+
+   @Override
+   public void cancelTask() {
+   this.running = false;
+   }
+
+   // 

+   @Override
+   public Function getStub() {
+   return this.combiner;
+   }
+
+   @Override
+   public String getTaskName() {
+   return this.taskName;
+   }
+
+   // 

+   @Override
+   public void collect(T record) {
+   try {
+   values.add(objectReuseEnabled ? record : 
serializer.copy(record));
+   if (values.size()  1) {
--- End diff --

is there a more reasonable value for this? (basically anything whose 
reasoning goes beyond i felt like using 1)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature

[GitHub] flink pull request: [FLINK-1521] Chained operators respect reuse

2015-02-12 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/392

[FLINK-1521] Chained operators respect reuse



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/incubator-flink flink-1521

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/392.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #392


commit b3f8ec9a107a2baf743468c9efdf28da9f49cf79
Author: zentol s.mo...@web.de
Date:   2015-02-12T19:36:00Z

[FLINK-1521] Chained operators respect reuse




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-785] Chained AllReduce / AllGroupReduce...

2015-02-13 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/370#issuecomment-74240805
  
the SynchronouesChainedCombineDriver stores multiple records as well 
doesn't it? except up to a fixed size instead of # of elements. could we not do 
the same here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-785] Chained AllReduce / AllGroupReduce...

2015-02-19 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/370#issuecomment-75030486
  
okay, I will the remove the chained combiner.

I'm curious though, the issue description specifically mentions the 
AllGroupReduce greatly benefiting from a chained version. did (or do) you have 
a specific approach in mind?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1521] Chained operators respect reuse

2015-02-19 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/392#issuecomment-75029399
  
Alright, I`m closing this PR then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-01-27 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-71611872
  
Tests run on travis (they don't right now because fabian merged something 
that changes the CSVInputFormat constructor, which breaks stuff on my end) but 
see: 
https://travis-ci.org/zentol/incubator-flink/jobs/48334902
and search for Running 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinderTest

putting it under flink-python means splitting it from the generic 
interface, right? that would be necessary in the long run anyway, so I'm all 
for it.

@dan-blanchard the generic interface is not just for python. it does reduce 
the amount of code you have to write in java by a pretty high amount. but It 
sets up some requirements, most prominently example support for binary data, 
memory-mapped files and sockets, though it would be possible to provide 
different options here. It is difficult for me to assess how difficult it would 
be; the generic and python part were coded and evolved simultaneously, and when 
something didn't fit i could just change it to do so. I think it's very likely 
that when someone wants to add another language we'll have to revisit a few 
things, but it provides at a good starting point.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-28 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/339#issuecomment-71825712
  
well you sure know how to keep me busy :)

you are right about moving it back. Updated.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...

2015-01-26 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/339

[FLINK-1419] [runtime] DC properly synchronized

Addresses the issue of files not being preserved in subsequent operations.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/incubator-flink dc_cache_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/339.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #339


commit 5c9059d3ce58d8415ce374927dd253579a5fd741
Author: zentol s.mo...@web.de
Date:   2015-01-26T10:07:53Z

[FLINK-1419] [runtime] DC properly synchronized




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-02-09 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-73505531
  
@qmlmoon has provided TPCH Query 3 / 10 and WebLogAnalysis examples


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-03-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/202#discussion_r26102129
  
--- Diff: 
flink-addons/flink-language-binding/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
 ---
@@ -0,0 +1,247 @@

+
--- End diff --

license issue is still unresolved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-03-04 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-77140113
  
and here i thought i was being clever by swapping to built-in functions.

addressed both issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1422] Add withParameters() to documenta...

2015-01-29 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/350#issuecomment-72105374
  
gotcha, I'll address the points mentioned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-14 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-92765309
  
Now uses TCP to exchange signals.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-20 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-94403584
  
yes that is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-20 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-94397131
  
All issues that I'm aware of are resolved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-20 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-94402062
  
@aljoscha Timeout is removed. Data transfer is still done with mapped file, 
access to these files is synchronized using TCP. Im not sure what you mean with 
your last sentence.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-21 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-94771487
  
nah I'll do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-21 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-94760254
  
@rmetzger Done. Unless you want me to merge commits as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-21 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-94774209
  
Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1924] Minor Refactoring

2015-04-22 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/616

[FLINK-1924] Minor Refactoring

This PR resolves a few minor issues, including
formatting
simpler python process initialization
renaming of the python connection following the switch to tcp

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink python_refactor2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #616


commit 81d4c197bab6de4fb45f19ba2ca06f1f042c1812
Author: zentol s.mo...@web.de
Date:   2015-03-27T10:53:23Z

[FLINK-1924] Minor Refactoring




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework

2015-04-29 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/638#issuecomment-97389037
  
I didn't check performance, it shouldn't have any noticeable effect on it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework

2015-04-29 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/638#issuecomment-97417603
  
@aljoscha that variable must be declared somewhere within the plan file. 
during the plan rebuild this would be executed as well, so i don't think this 
is a problem. in fact, i think this wouldn't work *before* this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework

2015-04-29 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/638#issuecomment-97418740
  
oh snap i just noticed a big flaw... well let's put this PR on hold for a 
bit.

I'm simply re executing the plan file on each node, but forgot to deal with 
arguments that were passed to the file -.-


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework

2015-04-28 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/638

[FLINK-1927] [py] Operator distribution rework

Python operators are no longer serialized and instead rebuilt on each node. 
This also means that the dill library is no longer necessary.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink papipr_operator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/638.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #638


commit b587a50b6ca8564a9e246e2d58d1c0cee125fdca
Author: zentol s.mo...@web.de
Date:   2015-04-19T08:07:38Z

[FLINK-1927] [py] Operator distribution rework




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-08 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-90859901
  
I can't say for sure whether the current timeout is enough, we don't have 
enough data for that.

we could make it configurable, that way a user can just increase it if it 
occurs without the job actually deadlocking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-08 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-90865196
  
i guess it's nice to have in case the java side dies *somehow* without 
calling close() on the java function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-08 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-90844274
  
The python process is now being terminated using kill -9, instead of 
process.destroy(). This should prevent python processes from lingering 
indefinitely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...

2015-04-08 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/202#issuecomment-90921879
  
The timeout measures how long java or python are stuck in a blocking udp 
operation. this generally means how long it takes for the python side to 
compute one chunk of data. the java side sends a chunk, and then waits for the 
next signal. if this takes too long, timeout. it's quite fickle to be honest, 
and without a regular heartbeat one can always construct scenarios where it 
will break the job.

i *think* tcp would cover it.

thanks for the hint to remove the shutdown hook, addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework

2015-05-20 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/638


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework

2015-05-20 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/638#issuecomment-103813351
  
Implementing this in a clean way has become trickier that i initially 
expected, as such I'll postpone it and close this PR for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2448]Clear cache file list in Execution...

2015-08-17 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1031#issuecomment-131769530
  
Does this work properly with collect() calls? as in, would the following 
plan still work?

`env = ..
env.registerCacheFile()
...
someSet.collect()
doSomethingThatUsesTheCacheFile
env.execute()
`

if we wipe all cache entries in the collect() call, the files will not be 
registered in the execute(), right? The plans these methods create are separate 
i think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1017#discussion_r37143081
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -27,114 +26,65 @@
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, 
TwoInputStreamOperatorIN1, IN2, OUT {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwoInputStreamTask.class);
-
private StreamTwoInputProcessorIN1, IN2 inputProcessor;
+   
+   private volatile boolean running = true;
 
@Override
-   public void registerInputOutput() {
-   try {
-   super.registerInputOutput();
+   public void init() throws Exception {
+   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
+   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);

-   TypeSerializerIN1 inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-   TypeSerializerIN2 inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
+   int numberOfInputs = configuration.getNumberOfInputs();

-   int numberOfInputs = configuration.getNumberOfInputs();
+   ArrayListInputGate inputList1 = new ArrayListInputGate();
+   ArrayListInputGate inputList2 = new ArrayListInputGate();

-   ArrayListInputGate inputList1 = new 
ArrayListInputGate();
-   ArrayListInputGate inputList2 = new 
ArrayListInputGate();
+   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);

-   ListStreamEdge inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-   
-   for (int i = 0; i  numberOfInputs; i++) {
-   int inputType = inEdges.get(i).getTypeNumber();
-   InputGate reader = 
getEnvironment().getInputGate(i);
-   switch (inputType) {
-   case 1:
-   inputList1.add(reader);
-   break;
-   case 2:
-   inputList2.add(reader);
-   break;
-   default:
-   throw new 
RuntimeException(Invalid input type number:  + inputType);
-   }
+   for (int i = 0; i  numberOfInputs; i++) {
+   int inputType = inEdges.get(i).getTypeNumber();
+   InputGate reader = getEnvironment().getInputGate(i);
+   switch (inputType) {
+   case 1:
+   inputList1.add(reader);
+   break;
+   case 2:
--- End diff --

i don't think that's necessary as it is just an index starting at 1. The 
possible values 1 and 2 are clearly related to which inputList the reader is 
added to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2530]optimize equal() of AcknowledgeChe...

2015-08-16 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1024#issuecomment-131512254
  
i don't think these statements are equivalent.

Assume that this.state == null and that.state != null.

In the original version we evaluate that.state == null, which is False, so 
the overall result is False.

In your version we would evaluate (this.state == null || 
this.state.equals(that.state)), which is True, making the overall result true.

Unless i made a mistake, -1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2530]optimize equal() of AcknowledgeChe...

2015-08-16 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1024#issuecomment-131534953
  
Looking at the pure logic this would work, but you can't remove that.state 
!= null since that could result in a NullPointerException inside equals.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37144152
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

generally we try to keep one PR for one issue, exceptions should only be 
done for closely related issues.

why did you decide to add these issues into this PR? ( i have a hard time 
understanding it, since the commits barely touch the same files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r37144275
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -897,7 +897,7 @@ class TaskManager(
 config.timeout,
 libCache,
 fileCache,
-runtimeInfo)
+new TaskRuntimeInfo(hostname, taskManagerConfig, 
tdd.getAttemptNumber))
--- End diff --

I would prefer if you opened a second PR once this is merged. The issues 
are not really related to each other; the 2nd commit was simply made based on 
the 1st commit. We would end up having two separate discussions in 1 PR, which 
i think is a bad idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2565] Support primitive Arrays as keys

2015-08-23 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/1043

[FLINK-2565] Support primitive Arrays as keys

Adds a comparator and test for every primitive array type.
Modifies the CustomType2 class in GroupingTest to retain a field with an 
unsupported type.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 2565_arrayKey

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1043.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1043


commit 7551a47e60186a91ecc1df364f1a3ae0c9474a3f
Author: zentol s.mo...@web.de
Date:   2015-08-23T13:36:47Z

[FLINK-2565] Support primitive Arrays as keys




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2557] TypeExtractor properly returns Mi...

2015-08-23 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/1045

[FLINK-2557] TypeExtractor properly returns MissingTypeInfo

This fix is not really obvious so let me explain:

getParameterTye() is called from two different places in the TypeExtractor; 
to validate the input type and to extract the output type.

Both cases consider the possibility that getParameterType() fails, but 
check for different exceptions. 

The TypeExtractor only returns a MissingTypeInfo if it encounters an 
InvalidTypesException; IllegalArgumentExceptions are not catched. This is what 
@mjsax encountered.
Changing the exception type causes the TypeExtractor to properly return a 
MissingTypeInfo, which is later overridden by the returns(...) call.

In order for the input validation to still work properly aswell, it now 
catches InvalidTypesExceptions instead.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 2557_types

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1045.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1045


commit 1c1dc459915c875ab0a4412aa3ef0a844f092171
Author: zentol s.mo...@web.de
Date:   2015-08-23T19:41:44Z

[FLINK-2557] TypeExtractor properly returns MissingTypeInfo




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2556] Refactor/Fix pre-flight Key valid...

2015-08-23 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/1044

[FLINK-2556] Refactor/Fix pre-flight Key validation

Removed redundant key validation in DistinctOperator
Keys constructors now make sure the type of every key is an instance of 
AtomicType/CompositeType, and that type.isKeyType() is true.
Additionally, the ExpressionKeys int[] constructor explicitly rejects Tuple0
Changes one test that actually tried something that shouldn't work in the 
first place.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink isKeyType_check

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1044.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1044


commit 7a57b6ef2ecdc7adaf770f8585cf8f974c684705
Author: zentol s.mo...@web.de
Date:   2015-08-23T13:57:34Z

[FLINK-2556] Refactor/Fix pre-flight Key validation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2557] TypeExtractor properly returns Mi...

2015-08-24 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1045#issuecomment-134203619
  
InvalidTypesException (from here on abbreviated as ITE) is no longer 
unchecked.

For this to work i had to make changes in surprisingly many classes so 
let's break it down:

* In general, InvalidProgramExceptions are now thrown for unrecoverable 
conditions, where previously an ITE would be thrown..

* TypeExtractor methods that return a TypeInformation will never throw an 
ITE and instead return a MissingTypeInfo if it is allowed, otherwise an 
InvalidProgramException.
* One public TypeExtractor method now throws an ITE, which is 
getParameterType(). This method is heavily used in the TypeExtractor itself as 
well, so i didn't see a way to fix this in a non API breaking way that doesn't 
rely yet again on unchecked exceptions.
 * affects hadoop-compatibility functions, which now catch them

* A few OperatorClasses catched ITE's in returns(TypeInformation ...) and 
then manually created a MissingTypeInfo. This now happens in the TypeExtractor 
directly.

* TypeInformation classes threw an ITE in getInfoFor(Class ...) if the 
given class didn't match the TypeInformation class. These have been changed to 
IllegalArgumentExceptions.

* DataSet/DataStream threw an ITE in getType() if the type was a 
MissingTypeInfo, changed to InvalidProgramException.
 * similiar issue in StreamExecutionEnvironment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2556] Refactor/Fix pre-flight Key valid...

2015-08-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1044#discussion_r37766287
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java ---
@@ -209,6 +209,9 @@ public ExpressionKeys(int[] groupingFields, 
TypeInformationT type, boolean all
throw new InvalidProgramException(Specifying 
keys via field positions is only valid  +
for tuple data types. Type:  
+ type);
}
+   if (type.getArity() == 0) {
+   throw new InvalidProgramException(Tuple size 
must be greater than 0. Size:  + type.getArity());
--- End diff --

ahh..it can be a key, but you can't group on a DataSetTuple0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2556] Refactor/Fix pre-flight Key valid...

2015-08-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1044#discussion_r37766532
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java ---
@@ -209,6 +209,9 @@ public ExpressionKeys(int[] groupingFields, 
TypeInformationT type, boolean all
throw new InvalidProgramException(Specifying 
keys via field positions is only valid  +
for tuple data types. Type:  
+ type);
}
+   if (type.getArity() == 0) {
+   throw new InvalidProgramException(Tuple size 
must be greater than 0. Size:  + type.getArity());
--- End diff --

this check was added specifically for Tuple0, as in the current code you'd 
get weird exceptions when grouping on a Tuple0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2565] Support primitive Arrays as keys

2015-08-24 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1043#issuecomment-134345186
  
I've added a test case to make sure a primitive array is accepted as a key. 
is that what you had in mind @tillrohrmann ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2448]Clear cache file list in Execution...

2015-08-20 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1031#issuecomment-132917505
  
using clearsinks would cause the above example with collect() to fail, 
since this clears sinks aswell. (the sinks are only not cleared when 
getExecutionPlan is called, it has nothing to do with a new execution).

just checking the filepath wouldn't work in my case, i distribute a folder 
that resides in the same location whose content varies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2077] [core] Rework Path class and add ...

2015-08-19 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1035#issuecomment-132481210
  
The changes i see are:
* removed hashCode()
* moved makeQualified to a new file as a static method
* reordered the remaining methods in Path

is that about right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2534][RUNTIME]Improve in CompactingHash...

2015-08-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1029#discussion_r37166847
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
 ---
@@ -223,7 +223,7 @@ public CompactingHashTable(TypeSerializerT 
buildSideSerializer,
// check the size of the first buffer and record it. all 
further buffers must have the same size.
// the size must also be a power of 2
this.segmentSize = memorySegments.get(0).size();
-   if ( (this.segmentSize  this.segmentSize - 1) != 0) {
+   if ((this.segmentSize  this.segmentSize - 1) != 0) {
--- End diff --

please avoid tiny formatting changes, they just clutter up the diff.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2534][RUNTIME]Improve in CompactingHash...

2015-08-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1029#discussion_r37166816
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
 ---
@@ -406,9 +403,7 @@ public void insertOrReplaceRecord(T record) throws 
IOException {
return;
}
}
-   else {
-   numInSegment++;
-   }
+   numInSegment++;
--- End diff --

this is not equivalent to the old implementation, since numInSegent is no 
longer incremented if 
this.buildSideComparator.equalToReference(valueAtPosition) is true, since we 
return.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2534][RUNTIME]Improve in CompactingHash...

2015-08-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1029#discussion_r37167787
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
 ---
@@ -406,9 +403,7 @@ public void insertOrReplaceRecord(T record) throws 
IOException {
return;
}
}
-   else {
-   numInSegment++;
-   }
+   numInSegment++;
--- End diff --

Ah, nevermind then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2557] TypeExtractor properly returns Mi...

2015-08-24 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1045#issuecomment-134129643
  
I agree, will get right on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2565] Support primitive Arrays as keys

2015-08-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1043#discussion_r37729559
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.BooleanComparator;
+
+public class BooleanPrimitiveArrayComparator extends 
PrimitiveArrayComparatorboolean[], BooleanComparator {
+   public BooleanPrimitiveArrayComparator(boolean ascending) {
+   super(ascending, new BooleanComparator(ascending));
+   }
+
+   @Override
+   public int hash(boolean[] record) {
+   int result = 0;
+   for (boolean field : record) {
+   result += comparator.hash(field);
+   }
+   return result;
+   }
+
+   @Override
+   public int compare(boolean[] first, boolean[] second) {
+   for (int x = 0; x  min(first.length, second.length); x++) {
+   int cmp = comparator.compare(first[x], second[x]);
+   if (cmp != 0) {
+   return cmp;
--- End diff --

no, because the underlying comparator has used the ascending flag already.

It's always a headache thinking about the comparator logic :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1681] Remove Record API from jdbc modul...

2015-08-04 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/982

[FLINK-1681] Remove Record API from jdbc module



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 1681_jdbc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/982.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #982


commit 6050f224235ed630fb5e5a6c57320eb9f4c7d272
Author: zentol s.mo...@web.de
Date:   2015-08-04T10:45:22Z

[FLINK-1681] Remove Record API from jdbc module




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1882] Removed RemotedCollector

2015-08-04 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/985

[FLINK-1882] Removed RemotedCollector



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 1882_remoteCollector

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/985.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #985


commit 5407804d550b143dcdf1b1fdb011e52f7555b983
Author: zentol s.mo...@web.de
Date:   2015-07-25T13:17:55Z

[FLINK-1882] Removed RemotedCollector




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2483]Add default branch of switch(sched...

2015-08-04 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/984#issuecomment-127595762
  
this change is not necessary imo. both in ExecutionGraph and JobGraph, 
scheduleMode is initialized to a proper ScheduleMode. setScheduleMode requires 
a ScheduleMode as an argument, so there's no issue here aswell. only possible 
problem i can see is passing null to setScheduleMode, but that's catched in the 
switch statement anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2432] Custom serializer support

2015-08-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/962#discussion_r36296145
  
--- Diff: 
flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
 ---
@@ -253,6 +257,18 @@ public ByteBuffer serialize(T value) {
public abstract void serializeInternal(T value);
}
 
+   private class CustomTypeSerializer extends 
SerializerCustomTypeWrapper {
+   public CustomTypeSerializer() {
+   super(0);
+   }
+   @Override
+   public void serializeInternal(CustomTypeWrapper value) {
+   byte[] bytes = value.getData();
+   buffer = ByteBuffer.allocate(bytes.length);
--- End diff --

Good point!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Serialized String comparison, Unicode support

2015-08-05 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/4#issuecomment-128009698
  
oh well now this has been a while, let's see...

from what i can tell the comparison doesn't work on code points but 
compares on single bytes, so it should be equivalent to the Java String 
Comparison. it depends on the serialization code though, so it could not be 
merged without modifications.

I can only guess why i decided to use codePoints during serialization, i 
assume it had to with the length of the string. Since some Unicode chars are 
represented as 2 chars internally just counting the chars written would lead to 
a wrong result. Here we are also dealing with a CharSequence which doesn't hide 
the unicode aspect like a String does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2431] Refactor PlanBinder/OperationInfo

2015-07-30 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/961

[FLINK-2431] Refactor PlanBinder/OperationInfo

PlanBinder methods were restructured to make the class more readable.

Keys are now stored as strings to simplify string-key-expression support.
Parameter retrieval was moved from PlanBinder to OperationInfo constructor, 
similiar to PythonOperationInfo.
This change reduces the clutter in Planbinder and allows code re-usage for 
operations with similar parameters.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 2431_pr

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/961.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #961


commit 908151add9f411309042c11091981dd577a3282d
Author: zentol s.mo...@web.de
Date:   2015-07-30T09:18:35Z

[FLINK-2431] Refactor PlanBinder/OperationInfo

PlanBinder methods were restructured to make the class more readable.

Keys are now stored as strings to simplify string-key-expression support.
Parameter retrieval was moved from PB to OI constructor, similiar to 
PythonOperationInfo.
This change reduces the clutter in Planbinder and allows code reusage for 
operations with similar parameters.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1927][py] Operator distribution rework

2015-07-29 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/931#issuecomment-125983186
  
Thanks for the review @mxm .

I've addressed the cosmetic issue you mentioned, and added a small fix for 
a separate issue as well (error reporting was partially broken).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2432] Custom serializer support

2015-08-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/962#discussion_r36395811
  
--- Diff: 
flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_custom.py
 ---
@@ -0,0 +1,76 @@
+# 
###
--- End diff --

Actually, I'm just gonna move this code into the test_main file, I was 
going to do that in my next test-centric PR anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-06 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-128309202
  
If you think it was necessary why was your first step to remove it's 
usage...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2494 ]Fix StreamGraph getJobGraph bug

2015-08-06 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/998#issuecomment-128605743
  
i would assume that forceCheckpoint is supposed to do exactly that, enforce 
checkpointing regardless of its support.

this change also means that if checkPointint is enabled, but not forced, 
the job will not hit an UnsupportedOperationException, which doesn't make any 
sense whatsoever.

-1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2432] Custom serializer support

2015-08-07 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/962#issuecomment-128681688
  
I've addressed the mentioned issues, and added a CUSTOM type constant, so 
that users don't have to constantly create new instances of their classes just 
to pass as a type parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r36512933
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
 ---
@@ -501,4 +536,22 @@ public int getSuperstepNumber() {
return (T) previousAggregates.get(name);
}
}
+
+   private static final class DoingNothing implements CallablePath{
+   private Path entry;
+
+   public DoingNothing(Path entry){
+   this.entry = entry;
+   }
+
+   @Override
+   public Path call() throws IOException{
+   try{
+   LocalFileSystem fs = (LocalFileSystem) 
entry.getFileSystem();
+   return entry.isAbsolute() ? new 
Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry);
+   } catch (ClassCastException e){
+   throw new RuntimeException(Collection 
execution must have only local file paths);
--- End diff --

dislike this error message, there's is no apparent relation to the 
distributed cache. 
how about The DistrbutedCache only supports local files for Collection 
Environments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...

2015-08-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/970#discussion_r36512186
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
 ---
@@ -37,18 +37,17 @@
private final HashMapString, Object initializedBroadcastVars = new 
HashMapString, Object();

private final HashMapString, List? uninitializedBroadcastVars = new 
HashMapString, List?();
-   
-   
+
public RuntimeUDFContext(String name, int numParallelSubtasks, int 
subtaskIndex, ClassLoader userCodeClassLoader,
ExecutionConfig 
executionConfig, MapString, Accumulator?,? accumulators) {
-   super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, accumulators);
+   this(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig,
+   new HashMapString, FuturePath(), 
accumulators);
}
-   
+
public RuntimeUDFContext(String name, int numParallelSubtasks, int 
subtaskIndex, ClassLoader userCodeClassLoader,
ExecutionConfig 
executionConfig, MapString, FuturePath cpTasks, MapString, 
Accumulator?,? accumulators) {
super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, accumulators, cpTasks);
}
-   
--- End diff --

Here we have a few unnecessary formatting changes that just clutter the 
diff.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

2015-08-06 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/992#issuecomment-128301542
  
if you remove that check, retryForever is unused and can be removed 
completely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1927][py] Operator distribution rework

2015-07-22 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/931

[FLINK-1927][py] Operator distribution rework

Python operators are no longer serialized and shipped across the
cluster. Instead the plan file is executed on each node, followed by
usage of the respective operator object.

removed dill library
also fixed [FLINK-2173] by always passing file paths explicitly to python

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink python_operator4

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/931.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #931


commit 40fd3501cacb7b382c7265f0370d0f94887b7e85
Author: zentol s.mo...@web.de
Date:   2015-07-21T19:22:19Z

[FLINK-1927][py] Operator distribution rework

Python operators are no longer serialized and shipped across the
cluster. Instead the plan file is executed on each node, followed by
usage of the respective operator object.

removed dill library
[FLINK-2173] filepaths are always explicitly passed to python




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2565] Support primitive Arrays as keys

2015-08-25 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1043#issuecomment-134618042
  
@StephanEwen I've reimplemented hashCode() and compare() accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2565] Support primitive Arrays as keys

2015-08-25 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1043#issuecomment-134663435
  
@fhueske Added the test you requested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2668] Chained Projections are no longer...

2015-10-22 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1279#issuecomment-150177113
  
@fhueske I've made the changes you suggested


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1982] [record-api] Remove dependencies ...

2015-10-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1294#discussion_r42808744
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java 
---
@@ -39,7 +38,7 @@
 import org.junit.Test;
 
 @SuppressWarnings("deprecation")
--- End diff --

this suppression can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...

2015-10-28 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/1306

[FLINK-2901] Remove Record API dependencies from flink-tests

This PR removes all dependencies to the Record API in the flink-tests 
project.

For this, tests were either ported or removed.

The following files were NOT ported:
* recordJobs/*
 * except kmeans/udfs CoordVector, PointInFormat (moved & ported)
 * except util/InfiniteIntegerInputFormat (moved & ported)
* recordJobTests/*
* operators/*
 *  except CoGroupSortITCase, MapPartitionITCase, ObjectReuseITCase (moved)
* accumulators/AccumulatorIterativeITCase
 * Unfinished test, should be addressed in a separate issue
* IterationTerminationWithTwoTails
 * nigh identical with IterationTerminationWithTerminationTail (ported 
version also failed)
* iterative/DeltaPageRankITCase, IterativeKMeansITCase, KMeansITCase
 * behaved like they belong into /recordJobTests/ (as in they simply 
execute a recordJob), thus removed
* optimizer/iterations/IterativeKMeansITCase
 * overlaps with ConnectedComponentsITCase (need a second opinion on this!)
* util/FailingTestBase
 * integrated into TaskFailureITCase (only user of the class)
* testPrograms/util/tests/*

cancelling/MatchJoinCancellingITCase was ported but disabled since it is 
unstable, and also disabled before. Separate issue seems appropriate.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 2901_record_tests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1306.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1306


commit ae3e5d64240feb0f2d9f5b74b709567ee3c66ec4
Author: Fabian Hueske <fhue...@apache.org>
Date:   2015-10-22T19:13:59Z

Remove ITCases for Record API operators

commit 88fcd3419774a48bc42c21fc46a745129bc4c0ac
Author: Fabian Hueske <fhue...@apache.org>
Date:   2015-10-22T19:15:38Z

Move operator ITCases into correct package

commit 4b9cecbe747d1131849070223821cbb4446a2a48
Author: Fabian Hueske <fhue...@apache.org>
Date:   2015-10-22T19:18:04Z

Remove Record API dependencies from CC iteration tests

commit 75052028dc8f29b4280d9c29d61fcb6f6c00ff0a
Author: Fabian Hueske <fhue...@apache.org>
Date:   2015-10-22T19:19:34Z

Remove Record API dependencies from WordCount compiler test

commit 91ca56c0df8a07cd142fd764b13af6ea3d7dae8a
Author: Fabian Hueske <fhue...@apache.org>
Date:   2015-10-22T19:20:34Z

Remove Record API program tests

commit 491afa1ea220f5292c36910d5c4f0c2491edc859
Author: zentol <s.mo...@web.de>
Date:   2015-10-25T14:07:37Z

Remove deprecation warning suppressions

commit a32a0d1b33c7ffcd4a997fa1cdd11fe299bc7e60
Author: zentol <s.mo...@web.de>
Date:   2015-10-27T20:22:38Z

[FLINK-2901] Remove Record API dependencies from flink-tests #1

commit 0600da3154665aaf1ee02510e24f8222ca12af5f
Author: zentol <s.mo...@web.de>
Date:   2015-10-27T20:22:45Z

[FLINK-2901] Remove Record API dependencies from flink-tests #2




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-11-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44439588
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
+   boolean[] includedMask = new boolean[size];
+   for (int x=0; x<includedMask.length; x++) {
+   includedMask[x] = true;
+   }
+   return includedMask;
+   }
+
+   protected static boolean[] toBooleanMask(int[] sourceFieldIndices) {
--- End diff --

I see your point, but don't think it's due to this methods. It follows a 
similar implementation in GenericCsvInputFormat.setFieldsGeneric that was used 
until now.

The key thing is that previously we checked the indices for a monotonous 
order, so the case you described couldn't occur. That check wasn't technically 
necessary, hence i removed it.

We can either re-add that check, or add documentation to the CsvInputFormat 
constructor and Scala ExecutionEnvironment.readCsvFile method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-11-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44435787
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
--- End diff --

I wanted to cover that case directly in the InputFormat instead of 
*somewhere* else. This method is used to create a mask for exactly that case, 
when we can infer the mask from the number of field types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-11-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44435810
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
--- End diff --

*cover it in an obvious manner


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-11-11 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1266#issuecomment-155764846
  
@fhueske I've addressed your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2441] Introduce Python OperationInfo

2015-11-13 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/1352

[FLINK-2441] Introduce Python OperationInfo

Introduces an OperationInfo object to the Python API, replacing the 
previously used dictionary. 

This is mostly a cosmetic change, making code shorter (and IMO more 
readable) within DataSet and ExecutionEnvironment and making the general 
structure more similar to the Java side. 

Furthermore all fields are initialized with a default value.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 2441_pyopinfo

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1352.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1352


commit ce23a695355db363c96eeec5c639b0597f802f96
Author: zentol <s.mo...@web.de>
Date:   2015-07-24T18:18:47Z

[FLINK-2441] Introduce Python OperationInfo

Introduces an OperationInfo object, replacing the previously used 
dictionary. This change generally makes related code shorter.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3012] Refactor boilerplate code in Data...

2015-11-15 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1355#issuecomment-156869041
  
I'm not too fond of the new version either and feel like this code bit 
doesn't really need more clarity, simply because it isn't a lot of code. 

Using a counter at all doesn't really make sense to me, you aren't counting 
anything effectively. a Boolean seems more intuitive. It is nit-picky but we 
are going for clarity here after all :)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2914] Add missing break Statement in ZK...

2015-11-16 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/1359

[FLINK-2914] Add missing break Statement in ZKJobGraphStore



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 2914_break

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1359.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1359


commit e54198f479f8366288c432ae77de54b3683416c7
Author: zentol <ches...@apache.org>
Date:   2015-11-16T10:35:28Z

[FLINK-2914] Add missing break Statement in ZKJobGraphStore




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...

2015-11-17 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1306#issuecomment-157382048
  
@fhueske I've addressed most of your concerns.

Things that still need work / clarification:
* PreviewPlanDumpTest was previously executed with 2 different sets of 
arguments, now only with 1. Should this be changed back to the previous 
behaviour? The arguments affect paths for sources/sink, parallelism and the 
number of Iterations
* test.classloading.jar.KMeansForTest appears to be a good replacement for 
the IterativeKMeansITCase, what's your take on that?
* The removed delta ilteration PageRank program looks very similar to the 
ConnectedComponents implementation under flink-examples. I don't think this 
needs a separate port.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...

2015-11-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44392775
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
 ---
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationTerminationWithTwoTails extends RecordAPITestBase {
-
-   private static final String INPUT = "1\n" + "2\n" + "3\n" + "4\n" + 
"5\n";
-   private static final String EXPECTED = "22\n";
-
-   protected String dataPath;
-   protected String resultPath;
-
-   public IterationTerminationWithTwoTails(){
-   setTaskManagerNumSlots(parallelism);
-   }
-
-   @Override
-   protected void preSubmit() throws Exception {
-   dataPath = createTempFile("datapoints.txt", INPUT);
-   resultPath = getTempFilePath("result");
-   }
-   
-   @Override
-   protected void postSubmit() throws Exception {
-   compareResultsByLinesInMemory(EXPECTED, resultPath);
-   }
-
-   @Override
-   protected Plan getTestJob() {
-   return getTestPlanPlan(parallelism, dataPath, resultPath);
-   }
-   
-   private static Plan getTestPlanPlan(int numSubTasks, String input, 
String output) {
-
-   FileDataSource initialInput = new 
FileDataSource(TextInputFormat.class, input, "input");
-   
-   BulkIteration iteration = new BulkIteration("Loop");
-   iteration.setInput(initialInput);
-   iteration.setMaximumNumberOfIterations(5);
-   Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
-
-   ReduceOperator sumReduce = ReduceOperator.builder(new 
SumReducer())
-   .input(iteration.getPartialSolution())
-   .name("Compute sum (Reduce)")
-   .build();
-   
-   iteration.setNextPartialSolution(sumReduce);
-   
-   MapOperator terminationMapper = MapOperator.builder(new 
TerminationMapper())
-   .input(iteration.getPartialSolution())
--- End diff --

so THAT was the difference, will port it!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...

2015-11-10 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1306#discussion_r44393132
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
 ---
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-
-public class IterativeKMeansITCase extends RecordAPITestBase {
--- End diff --

does the KMeansForTest qualify for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-10-30 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1266#issuecomment-152584371
  
We could move the differing lines (Csv:L114 Pojo:L218->L225) into separate 
methods that is called from a generic readRecord() method. something like 
fillRecord(OUT reuse, Object[] parsedValues).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2851] Move PythonAPI to flink-libraries

2015-10-14 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/1257

[FLINK-2851] Move PythonAPI to flink-libraries



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink _move_libraries

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1257.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1257


commit f9d108973c3c01f56be557cf056f193a69728e75
Author: zentol <s.mo...@web.de>
Date:   2015-10-14T14:49:31Z

[FLINK-2851] Move PythonAPI to flink-libraries




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat

2015-10-18 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/1266

[FLINK-2692] Untangle CsvInputFormat

This PR splits the CsvInputFormat into a Tuple and POJO Version. To this 
end, The (Common)CsvInputFormat classes were merged, and the type specific 
portions refactored into separate classes.

Additionally, the ScalaCsvInputFormat has been removed; Java and Scala API 
now use the same InputFormats. Previously, the formats differed in the way they 
created the output tuples; this is now realized in a newly introduced abstract 
method "createOrReuseInstance(Object[] fieldValues, T reuse)" within the 
TupleSerializerBase.

Fields to include and field names are no longer passed via setters, but 
instead via the contructor. Several new contructors were added to accommodate 
different use cases, along with 2 new static methods to generate a default 
include mask, or convert an indice int[] list to a boolean include mask.

Classes no longer have to be passed separately, as they are extracted from 
the typeinformation object.

A few sanity checks were moved from the ExecEnvironment to the InputFormat.

The testReadSparseWithShuffledPositions Test was removed since monotonous 
order of field indices is, and afaik was, not actually necessary due to the way 
it was converted to a boolean[].

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 2692_csv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1266.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1266


commit d497415adc2e58b4e9912ae89a53444825416366
Author: zentol <s.mo...@web.de>
Date:   2015-10-18T18:23:23Z

[FLINK-2692] Untangle CsvInputFormat




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   5   6   7   8   9   10   >