[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71489135 but that is exactly what is changing, both the delete and copy process are synchronized on the same object. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71490079 oh i see what you mean, maybe extend the synchronized block to include the actual delete stuff. yup that's a good idea, all i know is i tried it without the change and ran into issues, with the change it ran. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-72652471 hmm...you are now the second person to report that creating the tmp files does not work on OS X. i don't know why that doesn't work. the file creation is done from java, is there any magic required there? i can't debug OS X error myself at the moment. all i can do on that front is add sanity checks for better error reporting. the included triangle enumeration is kinda odd, even if it runs the output is empty; Ive already checked the implementation yesterday and it appears equal to the java counterpart. will give it another go. the plan execution is one of the more fragile parts. generally, when the process exits with an error it is noticed. but if for example something is missing (like the call to execute) things just get stuck. this is due to the fact that information is only ever sent to java, but never received, a complete one-way street. since accumulators nor actions were supposed to be implemented anytime soon this seemed appropriate, but it seems that requires a change already. some timeouts could be useful as well. @fhueske Thanks for trying it out! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/202#discussion_r23955578 --- Diff: docs/python_programming_guide.md --- @@ -0,0 +1,600 @@ +--- +title: Python Programming Guide +--- +!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +License); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +-- + +* This will be replaced by the TOC +{:toc} + + +a href=#top/a + +Introduction + + +Analysis programs in Flink are regular programs that implement transformations on data sets +(e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain +sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for +example write the data to (distributed) files, or to standard output (for example the command line +terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. +The execution can happen in a local JVM, or on clusters of many machines. + +In order to create your own Flink program, we encourage you to start with the +[program skeleton](#program-skeleton) and gradually add your own +[transformations](#transformations). The remaining sections act as references for additional +operations and advanced features. + + +Example Program +--- + +The following program is a complete, working example of WordCount. You can copy amp; paste the code +to run it locally. + +{% highlight python %} +from flink.plan.Environment import get_environment +from flink.plan.Constants import INT, STRING +from flink.functions.GroupReduceFunction import GroupReduceFunction + +class Adder(GroupReduceFunction): + def reduce(self, iterator, collector): +count, word = iterator.next() +count += sum([x[0] for x in iterator]) +collector.collect((count, word)) + +if __name__ == __main__: + env = get_environment() + data = env.from_elements(Who's there?, + I think I hear them. Stand, ho! Who's there?) + + data \ +.flat_map(lambda x: x.lower().split(), (INT, STRING)) \ +.group_by(1) \ +.reduce_group(Adder(), (INT, STRING), combinable=True) \ +.output() + + env.execute() +} --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/202#discussion_r23958310 --- Diff: flink-addons/flink-language-binding/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java --- @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.languagebinding.api.java.common; + +/** + * Container for all generic information related to operations. This class contains the absolute minimum fields that are + * required for all operations. This class should be extended to contain any additional fields required on a + * per-language basis. + */ +public abstract class OperationInfo { + public int parentID; //DataSet that an operation is applied on + public int otherID; //secondary DataSet + public int setID; //ID for new DataSet + public int[] keys1; //grouping keys + public int[] keys2; //grouping keys + public int[] projectionKeys1; //projection keys + public int[] projectionKeys2; //projection keys + public Object types; //an object that is of the same type as the output type --- End diff -- transient field is a good idea... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/202#discussion_r23954389 --- Diff: flink-addons/flink-language-binding/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java --- @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.languagebinding.api.java.common; + +/** + * Container for all generic information related to operations. This class contains the absolute minimum fields that are + * required for all operations. This class should be extended to contain any additional fields required on a + * per-language basis. + */ +public abstract class OperationInfo { + public int parentID; //DataSet that an operation is applied on + public int otherID; //secondary DataSet + public int setID; //ID for new DataSet + public int[] keys1; //grouping keys + public int[] keys2; //grouping keys + public int[] projectionKeys1; //projection keys + public int[] projectionKeys2; //projection keys + public Object types; //an object that is of the same type as the output type --- End diff -- yes that would be nicer, but last time i tried that i got NotSerializableException due to the TypeInformation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-72537053 hard to find error messages what do you mean by that? what did you run the job with (data size, dop) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-72744279 about error messages going to command-line: the only way i see for that to work is by wrapping the complete error message into an exception, since they do show up on the command-line. wc deadlock: i just can't reproduce it. i tried small files (4 words) and went up to 750mb with dop=1. can you send me the test data you used? @qmlmoon THANK YOU! that would have taken me ages to figure out. working on a fix right now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1422] Add withParameters() to documenta...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/350#discussion_r24168173 --- Diff: docs/programming_guide.md --- @@ -2398,6 +2399,61 @@ of a function, or use the `withParameters(...)` method to pass in a configuratio [Back to top](#top) +Passing parameters to functions +--- + +Parameters can be passed to rich functions using either the constructor (if the function is --- End diff -- ok, so which requirements do exist for the operator to be serializable? or can i just omit those details and say something like parameters can be stored in non-transient fields if the function is serializable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-785] Chained AllReduce
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/370 [FLINK-785] Chained AllReduce This a a preliminary PR to see whether I'm on the right track. I'm wondering whether this would be everything needed to add a Chained AllReduce, before i continue with this issue. I tried it out and it appears to work, but wanted to make sure. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/incubator-flink chained_all_reduce Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/370.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #370 commit 37b77670b517995498f83df452ed5b20754fc63e Author: zentol s.mo...@web.de Date: 2015-02-06T13:38:05Z [FLINK-785] Chained AllReduce --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-785] Chained AllReduce
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/370#issuecomment-73404704 there's something funky going on with the tests here. i got 2 failing tests in ObjectReuseITCase: ``` ObjectReuseITCaseJavaProgramTestBase.testJobWithoutObjectReuse:168-postSubmit:68-TestBaseUtils.compareResultsByLinesInMemory:223-TestBaseUtils.compareResultsByLinesInMemory:238 arrays first differed at element [0]; expected:a,[10]0 but was:a,[6]0 ObjectReuseITCaseJavaProgramTestBase.testJobWithObjectReuse:120-postSubmit:68-TestBaseUtils.compareResultsByLinesInMemory:223-TestBaseUtils.compareResultsByLinesInMemory:238 arrays first differed at element [0]; expected:a,[10]0 but was:a,[6]0 ``` These two tests verify the wrong behaviour that occurs when object reuse is enabled but not accounted for. i thought this was generally treated as undefined behaviour, why are there tests for that? the other 2 tests fail with NullPointerException when accessing the expected result. ``` ClosureCleanerITCase.after:52-TestBaseUtils.compareResultsByLinesInMemory:223-TestBaseUtils.compareResultsByLinesInMemory:234 » NullPointer ClosureCleanerITCase.after:52-TestBaseUtils.compareResultsByLinesInMemory:223-TestBaseUtils.compareResultsByLinesInMemory:234 » NullPointer ``` i can't figure out why this occurs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-72752968 @qmlmoon sweet. @rmetzger errors should show up on the console now. and in the .out file. and i suppose by extension in the .log file aswell. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1398] Introduce extractSingleField() in...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/308#issuecomment-70413423 @StephanEwen would that look like this? ```java reverse(dataset.map(...)).filter(...) ``` Ive been thinking about the API overload issue a bit: could a DataSet have a field that offered more functions? something like: ```java dataset.map().ext.reverse().filter() ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-785] Chained AllReduce
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/370#issuecomment-73923184 sorry for not making it clear that i forced pushed some changes, forgot making a separate branch. the changes you made to the driver are already in, sorry for wasting your time there. i will include the changes to the test case. The ChainedAllReduceDriver currently does not cause any problems. I've added a ChainedAllGroupReduceCombineDriver aswell, which fails for one test case ``` ReplicatingDataSourceITCase.after:69-TestBaseUtils.compareResultsByLinesInMemory:223-TestBaseUtils.compareResultsByLinesInMemory:238 arrays first differed at element [0]; expected:([50050]0) but was:([37525]0) ``` may be related to FLINK-1521, once the newest test is complete I'll hopefully have a definite answer. when i force objectReuse in the global reduce all tests pass. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-73936205 in my case, it returns this: ```python import socket socket.gethostname() 'Linux' socket.getaddrinfo(socket.gethostname(), None, socket.AF_INET, socket.SOCK_DGRAM) [(2, 2, 17, '', ('127.0.1.1', 0))] ``` 127.0.1.1 is also present in my /etc/hosts i assume they are different because no error is printed. I tried several different approaches, and when i got no (python) error and only the timeout, further investigation showed that the resolved ip did not match 127.0.1.1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-73945405 @mxm I've added another potential fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-73731632 if you don't mind, try the example again and let it run. both processes should timeout after 5 minutes throwing exceptions, hopefully pointing to the origin of the lock. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-785] Chained AllReduce / AllGroupReduce...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/370#discussion_r24633778 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllGroupReduceCombineDriver.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.operators.chaining; + +import java.util.ArrayList; + +import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.RegularPactTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ChainedAllGroupReduceCombineDriverT extends ChainedDriverT, T { + private static final Logger LOG = LoggerFactory.getLogger(ChainedAllGroupReduceCombineDriver.class); + + // + private FlatCombineFunctionT combiner; + private TypeSerializerT serializer; + + private volatile boolean running = true; + + private final ArrayListT values = new ArrayList(); + + // + @Override + public void setup(AbstractInvokable parent) { + @SuppressWarnings(unchecked) + final FlatCombineFunctionT com = RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatCombineFunction.class); + this.combiner = com; + FunctionUtils.setFunctionRuntimeContext(com, getUdfRuntimeContext()); + + this.objectReuseEnabled = this.executionConfig.isObjectReuseEnabled(); + + final TypeSerializerFactoryT serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader); + this.serializer = serializerFactory.getSerializer(); + + if (LOG.isDebugEnabled()) { + LOG.debug(ChainedAllGroupReduceCombineDriver object reuse: + (this.objectReuseEnabled ? ENABLED : DISABLED) + .); + } + } + + @Override + public void openTask() throws Exception { + final Configuration stubConfig = this.config.getStubParameters(); + RegularPactTask.openUserCode(this.combiner, stubConfig); + } + + @Override + public void closeTask() throws Exception { + if (!this.running) { + return; + } + RegularPactTask.closeUserCode(this.combiner); + } + + @Override + public void cancelTask() { + this.running = false; + } + + // + @Override + public Function getStub() { + return this.combiner; + } + + @Override + public String getTaskName() { + return this.taskName; + } + + // + @Override + public void collect(T record) { + try { + values.add(objectReuseEnabled ? record : serializer.copy(record)); + if (values.size() 1) { --- End diff -- is there a more reasonable value for this? (basically anything whose reasoning goes beyond i felt like using 1) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature
[GitHub] flink pull request: [FLINK-1521] Chained operators respect reuse
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/392 [FLINK-1521] Chained operators respect reuse You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/incubator-flink flink-1521 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/392.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #392 commit b3f8ec9a107a2baf743468c9efdf28da9f49cf79 Author: zentol s.mo...@web.de Date: 2015-02-12T19:36:00Z [FLINK-1521] Chained operators respect reuse --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-785] Chained AllReduce / AllGroupReduce...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/370#issuecomment-74240805 the SynchronouesChainedCombineDriver stores multiple records as well doesn't it? except up to a fixed size instead of # of elements. could we not do the same here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-785] Chained AllReduce / AllGroupReduce...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/370#issuecomment-75030486 okay, I will the remove the chained combiner. I'm curious though, the issue description specifically mentions the AllGroupReduce greatly benefiting from a chained version. did (or do) you have a specific approach in mind? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1521] Chained operators respect reuse
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/392#issuecomment-75029399 Alright, I`m closing this PR then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-71611872 Tests run on travis (they don't right now because fabian merged something that changes the CSVInputFormat constructor, which breaks stuff on my end) but see: https://travis-ci.org/zentol/incubator-flink/jobs/48334902 and search for Running org.apache.flink.languagebinding.api.java.python.PythonPlanBinderTest putting it under flink-python means splitting it from the generic interface, right? that would be necessary in the long run anyway, so I'm all for it. @dan-blanchard the generic interface is not just for python. it does reduce the amount of code you have to write in java by a pretty high amount. but It sets up some requirements, most prominently example support for binary data, memory-mapped files and sockets, though it would be possible to provide different options here. It is difficult for me to assess how difficult it would be; the generic and python part were coded and evolved simultaneously, and when something didn't fit i could just change it to do so. I think it's very likely that when someone wants to add another language we'll have to revisit a few things, but it provides at a good starting point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/339#issuecomment-71825712 well you sure know how to keep me busy :) you are right about moving it back. Updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1419] [runtime] DC properly synchronize...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/339 [FLINK-1419] [runtime] DC properly synchronized Addresses the issue of files not being preserved in subsequent operations. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/incubator-flink dc_cache_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/339.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #339 commit 5c9059d3ce58d8415ce374927dd253579a5fd741 Author: zentol s.mo...@web.de Date: 2015-01-26T10:07:53Z [FLINK-1419] [runtime] DC properly synchronized --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-73505531 @qmlmoon has provided TPCH Query 3 / 10 and WebLogAnalysis examples --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/202#discussion_r26102129 --- Diff: flink-addons/flink-language-binding/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py --- @@ -0,0 +1,247 @@ + --- End diff -- license issue is still unresolved. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-77140113 and here i thought i was being clever by swapping to built-in functions. addressed both issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1422] Add withParameters() to documenta...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/350#issuecomment-72105374 gotcha, I'll address the points mentioned. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-92765309 Now uses TCP to exchange signals. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94403584 yes that is correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94397131 All issues that I'm aware of are resolved. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94402062 @aljoscha Timeout is removed. Data transfer is still done with mapped file, access to these files is synchronized using TCP. Im not sure what you mean with your last sentence. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94771487 nah I'll do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94760254 @rmetzger Done. Unless you want me to merge commits as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94774209 Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1924] Minor Refactoring
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/616 [FLINK-1924] Minor Refactoring This PR resolves a few minor issues, including formatting simpler python process initialization renaming of the python connection following the switch to tcp You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink python_refactor2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/616.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #616 commit 81d4c197bab6de4fb45f19ba2ca06f1f042c1812 Author: zentol s.mo...@web.de Date: 2015-03-27T10:53:23Z [FLINK-1924] Minor Refactoring --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/638#issuecomment-97389037 I didn't check performance, it shouldn't have any noticeable effect on it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/638#issuecomment-97417603 @aljoscha that variable must be declared somewhere within the plan file. during the plan rebuild this would be executed as well, so i don't think this is a problem. in fact, i think this wouldn't work *before* this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/638#issuecomment-97418740 oh snap i just noticed a big flaw... well let's put this PR on hold for a bit. I'm simply re executing the plan file on each node, but forgot to deal with arguments that were passed to the file -.- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/638 [FLINK-1927] [py] Operator distribution rework Python operators are no longer serialized and instead rebuilt on each node. This also means that the dill library is no longer necessary. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink papipr_operator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/638.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #638 commit b587a50b6ca8564a9e246e2d58d1c0cee125fdca Author: zentol s.mo...@web.de Date: 2015-04-19T08:07:38Z [FLINK-1927] [py] Operator distribution rework --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-90859901 I can't say for sure whether the current timeout is enough, we don't have enough data for that. we could make it configurable, that way a user can just increase it if it occurs without the job actually deadlocking. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-90865196 i guess it's nice to have in case the java side dies *somehow* without calling close() on the java function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-90844274 The python process is now being terminated using kill -9, instead of process.destroy(). This should prevent python processes from lingering indefinitely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-90921879 The timeout measures how long java or python are stuck in a blocking udp operation. this generally means how long it takes for the python side to compute one chunk of data. the java side sends a chunk, and then waits for the next signal. if this takes too long, timeout. it's quite fickle to be honest, and without a regular heartbeat one can always construct scenarios where it will break the job. i *think* tcp would cover it. thanks for the hint to remove the shutdown hook, addressed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/638 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/638#issuecomment-103813351 Implementing this in a clean way has become trickier that i initially expected, as such I'll postpone it and close this PR for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2448]Clear cache file list in Execution...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1031#issuecomment-131769530 Does this work properly with collect() calls? as in, would the following plan still work? `env = .. env.registerCacheFile() ... someSet.collect() doSomethingThatUsesTheCacheFile env.execute() ` if we wipe all cache entries in the collect() call, the files will not be registered in the execute(), right? The plans these methods create are separate i think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1017#discussion_r37143081 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java --- @@ -27,114 +26,65 @@ import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TwoInputStreamTaskIN1, IN2, OUT extends StreamTaskOUT, TwoInputStreamOperatorIN1, IN2, OUT { - private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class); - private StreamTwoInputProcessorIN1, IN2 inputProcessor; + + private volatile boolean running = true; @Override - public void registerInputOutput() { - try { - super.registerInputOutput(); + public void init() throws Exception { + TypeSerializerIN1 inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader); + TypeSerializerIN2 inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader); - TypeSerializerIN1 inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader); - TypeSerializerIN2 inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader); + int numberOfInputs = configuration.getNumberOfInputs(); - int numberOfInputs = configuration.getNumberOfInputs(); + ArrayListInputGate inputList1 = new ArrayListInputGate(); + ArrayListInputGate inputList2 = new ArrayListInputGate(); - ArrayListInputGate inputList1 = new ArrayListInputGate(); - ArrayListInputGate inputList2 = new ArrayListInputGate(); + ListStreamEdge inEdges = configuration.getInPhysicalEdges(userClassLoader); - ListStreamEdge inEdges = configuration.getInPhysicalEdges(userClassLoader); - - for (int i = 0; i numberOfInputs; i++) { - int inputType = inEdges.get(i).getTypeNumber(); - InputGate reader = getEnvironment().getInputGate(i); - switch (inputType) { - case 1: - inputList1.add(reader); - break; - case 2: - inputList2.add(reader); - break; - default: - throw new RuntimeException(Invalid input type number: + inputType); - } + for (int i = 0; i numberOfInputs; i++) { + int inputType = inEdges.get(i).getTypeNumber(); + InputGate reader = getEnvironment().getInputGate(i); + switch (inputType) { + case 1: + inputList1.add(reader); + break; + case 2: --- End diff -- i don't think that's necessary as it is just an index starting at 1. The possible values 1 and 2 are clearly related to which inputList the reader is added to. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2530]optimize equal() of AcknowledgeChe...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1024#issuecomment-131512254 i don't think these statements are equivalent. Assume that this.state == null and that.state != null. In the original version we evaluate that.state == null, which is False, so the overall result is False. In your version we would evaluate (this.state == null || this.state.equals(that.state)), which is True, making the overall result true. Unless i made a mistake, -1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2530]optimize equal() of AcknowledgeChe...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1024#issuecomment-131534953 Looking at the pure logic this would work, but you can't remove that.state != null since that could result in a NullPointerException inside equals. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/970#discussion_r37144152 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -897,7 +897,7 @@ class TaskManager( config.timeout, libCache, fileCache, -runtimeInfo) +new TaskRuntimeInfo(hostname, taskManagerConfig, tdd.getAttemptNumber)) --- End diff -- generally we try to keep one PR for one issue, exceptions should only be done for closely related issues. why did you decide to add these issues into this PR? ( i have a hard time understanding it, since the commits barely touch the same files. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/970#discussion_r37144275 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -897,7 +897,7 @@ class TaskManager( config.timeout, libCache, fileCache, -runtimeInfo) +new TaskRuntimeInfo(hostname, taskManagerConfig, tdd.getAttemptNumber)) --- End diff -- I would prefer if you opened a second PR once this is merged. The issues are not really related to each other; the 2nd commit was simply made based on the 1st commit. We would end up having two separate discussions in 1 PR, which i think is a bad idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2565] Support primitive Arrays as keys
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/1043 [FLINK-2565] Support primitive Arrays as keys Adds a comparator and test for every primitive array type. Modifies the CustomType2 class in GroupingTest to retain a field with an unsupported type. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 2565_arrayKey Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1043.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1043 commit 7551a47e60186a91ecc1df364f1a3ae0c9474a3f Author: zentol s.mo...@web.de Date: 2015-08-23T13:36:47Z [FLINK-2565] Support primitive Arrays as keys --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2557] TypeExtractor properly returns Mi...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/1045 [FLINK-2557] TypeExtractor properly returns MissingTypeInfo This fix is not really obvious so let me explain: getParameterTye() is called from two different places in the TypeExtractor; to validate the input type and to extract the output type. Both cases consider the possibility that getParameterType() fails, but check for different exceptions. The TypeExtractor only returns a MissingTypeInfo if it encounters an InvalidTypesException; IllegalArgumentExceptions are not catched. This is what @mjsax encountered. Changing the exception type causes the TypeExtractor to properly return a MissingTypeInfo, which is later overridden by the returns(...) call. In order for the input validation to still work properly aswell, it now catches InvalidTypesExceptions instead. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 2557_types Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1045.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1045 commit 1c1dc459915c875ab0a4412aa3ef0a844f092171 Author: zentol s.mo...@web.de Date: 2015-08-23T19:41:44Z [FLINK-2557] TypeExtractor properly returns MissingTypeInfo --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2556] Refactor/Fix pre-flight Key valid...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/1044 [FLINK-2556] Refactor/Fix pre-flight Key validation Removed redundant key validation in DistinctOperator Keys constructors now make sure the type of every key is an instance of AtomicType/CompositeType, and that type.isKeyType() is true. Additionally, the ExpressionKeys int[] constructor explicitly rejects Tuple0 Changes one test that actually tried something that shouldn't work in the first place. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink isKeyType_check Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1044.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1044 commit 7a57b6ef2ecdc7adaf770f8585cf8f974c684705 Author: zentol s.mo...@web.de Date: 2015-08-23T13:57:34Z [FLINK-2556] Refactor/Fix pre-flight Key validation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2557] TypeExtractor properly returns Mi...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1045#issuecomment-134203619 InvalidTypesException (from here on abbreviated as ITE) is no longer unchecked. For this to work i had to make changes in surprisingly many classes so let's break it down: * In general, InvalidProgramExceptions are now thrown for unrecoverable conditions, where previously an ITE would be thrown.. * TypeExtractor methods that return a TypeInformation will never throw an ITE and instead return a MissingTypeInfo if it is allowed, otherwise an InvalidProgramException. * One public TypeExtractor method now throws an ITE, which is getParameterType(). This method is heavily used in the TypeExtractor itself as well, so i didn't see a way to fix this in a non API breaking way that doesn't rely yet again on unchecked exceptions. * affects hadoop-compatibility functions, which now catch them * A few OperatorClasses catched ITE's in returns(TypeInformation ...) and then manually created a MissingTypeInfo. This now happens in the TypeExtractor directly. * TypeInformation classes threw an ITE in getInfoFor(Class ...) if the given class didn't match the TypeInformation class. These have been changed to IllegalArgumentExceptions. * DataSet/DataStream threw an ITE in getType() if the type was a MissingTypeInfo, changed to InvalidProgramException. * similiar issue in StreamExecutionEnvironment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2556] Refactor/Fix pre-flight Key valid...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1044#discussion_r37766287 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java --- @@ -209,6 +209,9 @@ public ExpressionKeys(int[] groupingFields, TypeInformationT type, boolean all throw new InvalidProgramException(Specifying keys via field positions is only valid + for tuple data types. Type: + type); } + if (type.getArity() == 0) { + throw new InvalidProgramException(Tuple size must be greater than 0. Size: + type.getArity()); --- End diff -- ahh..it can be a key, but you can't group on a DataSetTuple0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2556] Refactor/Fix pre-flight Key valid...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1044#discussion_r37766532 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java --- @@ -209,6 +209,9 @@ public ExpressionKeys(int[] groupingFields, TypeInformationT type, boolean all throw new InvalidProgramException(Specifying keys via field positions is only valid + for tuple data types. Type: + type); } + if (type.getArity() == 0) { + throw new InvalidProgramException(Tuple size must be greater than 0. Size: + type.getArity()); --- End diff -- this check was added specifically for Tuple0, as in the current code you'd get weird exceptions when grouping on a Tuple0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2565] Support primitive Arrays as keys
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1043#issuecomment-134345186 I've added a test case to make sure a primitive array is accepted as a key. is that what you had in mind @tillrohrmann ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2448]Clear cache file list in Execution...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1031#issuecomment-132917505 using clearsinks would cause the above example with collect() to fail, since this clears sinks aswell. (the sinks are only not cleared when getExecutionPlan is called, it has nothing to do with a new execution). just checking the filepath wouldn't work in my case, i distribute a folder that resides in the same location whose content varies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2077] [core] Rework Path class and add ...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1035#issuecomment-132481210 The changes i see are: * removed hashCode() * moved makeQualified to a new file as a static method * reordered the remaining methods in Path is that about right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2534][RUNTIME]Improve in CompactingHash...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1029#discussion_r37166847 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java --- @@ -223,7 +223,7 @@ public CompactingHashTable(TypeSerializerT buildSideSerializer, // check the size of the first buffer and record it. all further buffers must have the same size. // the size must also be a power of 2 this.segmentSize = memorySegments.get(0).size(); - if ( (this.segmentSize this.segmentSize - 1) != 0) { + if ((this.segmentSize this.segmentSize - 1) != 0) { --- End diff -- please avoid tiny formatting changes, they just clutter up the diff. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2534][RUNTIME]Improve in CompactingHash...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1029#discussion_r37166816 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java --- @@ -406,9 +403,7 @@ public void insertOrReplaceRecord(T record) throws IOException { return; } } - else { - numInSegment++; - } + numInSegment++; --- End diff -- this is not equivalent to the old implementation, since numInSegent is no longer incremented if this.buildSideComparator.equalToReference(valueAtPosition) is true, since we return. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2534][RUNTIME]Improve in CompactingHash...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1029#discussion_r37167787 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java --- @@ -406,9 +403,7 @@ public void insertOrReplaceRecord(T record) throws IOException { return; } } - else { - numInSegment++; - } + numInSegment++; --- End diff -- Ah, nevermind then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2557] TypeExtractor properly returns Mi...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1045#issuecomment-134129643 I agree, will get right on it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2565] Support primitive Arrays as keys
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1043#discussion_r37729559 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.typeutils.base.array; + +import static java.lang.Math.min; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.base.BooleanComparator; + +public class BooleanPrimitiveArrayComparator extends PrimitiveArrayComparatorboolean[], BooleanComparator { + public BooleanPrimitiveArrayComparator(boolean ascending) { + super(ascending, new BooleanComparator(ascending)); + } + + @Override + public int hash(boolean[] record) { + int result = 0; + for (boolean field : record) { + result += comparator.hash(field); + } + return result; + } + + @Override + public int compare(boolean[] first, boolean[] second) { + for (int x = 0; x min(first.length, second.length); x++) { + int cmp = comparator.compare(first[x], second[x]); + if (cmp != 0) { + return cmp; --- End diff -- no, because the underlying comparator has used the ascending flag already. It's always a headache thinking about the comparator logic :( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1681] Remove Record API from jdbc modul...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/982 [FLINK-1681] Remove Record API from jdbc module You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 1681_jdbc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/982.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #982 commit 6050f224235ed630fb5e5a6c57320eb9f4c7d272 Author: zentol s.mo...@web.de Date: 2015-08-04T10:45:22Z [FLINK-1681] Remove Record API from jdbc module --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1882] Removed RemotedCollector
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/985 [FLINK-1882] Removed RemotedCollector You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 1882_remoteCollector Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/985.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #985 commit 5407804d550b143dcdf1b1fdb011e52f7555b983 Author: zentol s.mo...@web.de Date: 2015-07-25T13:17:55Z [FLINK-1882] Removed RemotedCollector --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2483]Add default branch of switch(sched...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/984#issuecomment-127595762 this change is not necessary imo. both in ExecutionGraph and JobGraph, scheduleMode is initialized to a proper ScheduleMode. setScheduleMode requires a ScheduleMode as an argument, so there's no issue here aswell. only possible problem i can see is passing null to setScheduleMode, but that's catched in the switch statement anyway. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2432] Custom serializer support
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/962#discussion_r36296145 --- Diff: flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java --- @@ -253,6 +257,18 @@ public ByteBuffer serialize(T value) { public abstract void serializeInternal(T value); } + private class CustomTypeSerializer extends SerializerCustomTypeWrapper { + public CustomTypeSerializer() { + super(0); + } + @Override + public void serializeInternal(CustomTypeWrapper value) { + byte[] bytes = value.getData(); + buffer = ByteBuffer.allocate(bytes.length); --- End diff -- Good point! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Serialized String comparison, Unicode support
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/4#issuecomment-128009698 oh well now this has been a while, let's see... from what i can tell the comparison doesn't work on code points but compares on single bytes, so it should be equivalent to the Java String Comparison. it depends on the serialization code though, so it could not be merged without modifications. I can only guess why i decided to use codePoints during serialization, i assume it had to with the length of the string. Since some Unicode chars are represented as 2 chars internally just counting the chars written would lead to a wrong result. Here we are also dealing with a CharSequence which doesn't hide the unicode aspect like a String does. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2431] Refactor PlanBinder/OperationInfo
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/961 [FLINK-2431] Refactor PlanBinder/OperationInfo PlanBinder methods were restructured to make the class more readable. Keys are now stored as strings to simplify string-key-expression support. Parameter retrieval was moved from PlanBinder to OperationInfo constructor, similiar to PythonOperationInfo. This change reduces the clutter in Planbinder and allows code re-usage for operations with similar parameters. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 2431_pr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/961.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #961 commit 908151add9f411309042c11091981dd577a3282d Author: zentol s.mo...@web.de Date: 2015-07-30T09:18:35Z [FLINK-2431] Refactor PlanBinder/OperationInfo PlanBinder methods were restructured to make the class more readable. Keys are now stored as strings to simplify string-key-expression support. Parameter retrieval was moved from PB to OI constructor, similiar to PythonOperationInfo. This change reduces the clutter in Planbinder and allows code reusage for operations with similar parameters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1927][py] Operator distribution rework
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/931#issuecomment-125983186 Thanks for the review @mxm . I've addressed the cosmetic issue you mentioned, and added a small fix for a separate issue as well (error reporting was partially broken). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2432] Custom serializer support
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/962#discussion_r36395811 --- Diff: flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_custom.py --- @@ -0,0 +1,76 @@ +# ### --- End diff -- Actually, I'm just gonna move this code into the test_main file, I was going to do that in my next test-centric PR anyway. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-128309202 If you think it was necessary why was your first step to remove it's usage... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2494 ]Fix StreamGraph getJobGraph bug
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/998#issuecomment-128605743 i would assume that forceCheckpoint is supposed to do exactly that, enforce checkpointing regardless of its support. this change also means that if checkPointint is enabled, but not forced, the job will not hit an UnsupportedOperationException, which doesn't make any sense whatsoever. -1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2432] Custom serializer support
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/962#issuecomment-128681688 I've addressed the mentioned issues, and added a CUSTOM type constant, so that users don't have to constantly create new instances of their classes just to pass as a type parameter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/970#discussion_r36512933 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java --- @@ -501,4 +536,22 @@ public int getSuperstepNumber() { return (T) previousAggregates.get(name); } } + + private static final class DoingNothing implements CallablePath{ + private Path entry; + + public DoingNothing(Path entry){ + this.entry = entry; + } + + @Override + public Path call() throws IOException{ + try{ + LocalFileSystem fs = (LocalFileSystem) entry.getFileSystem(); + return entry.isAbsolute() ? new Path(entry.toUri().getPath()): new Path(fs.getWorkingDirectory(),entry); + } catch (ClassCastException e){ + throw new RuntimeException(Collection execution must have only local file paths); --- End diff -- dislike this error message, there's is no apparent relation to the distributed cache. how about The DistrbutedCache only supports local files for Collection Environments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2458][FLINK-2449]Access distributed cac...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/970#discussion_r36512186 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java --- @@ -37,18 +37,17 @@ private final HashMapString, Object initializedBroadcastVars = new HashMapString, Object(); private final HashMapString, List? uninitializedBroadcastVars = new HashMapString, List?(); - - + public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, MapString, Accumulator?,? accumulators) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators); + this(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, + new HashMapString, FuturePath(), accumulators); } - + public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, MapString, FuturePath cpTasks, MapString, Accumulator?,? accumulators) { super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks); } - --- End diff -- Here we have a few unnecessary formatting changes that just clutter the diff. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/992#issuecomment-128301542 if you remove that check, retryForever is unused and can be removed completely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1927][py] Operator distribution rework
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/931 [FLINK-1927][py] Operator distribution rework Python operators are no longer serialized and shipped across the cluster. Instead the plan file is executed on each node, followed by usage of the respective operator object. removed dill library also fixed [FLINK-2173] by always passing file paths explicitly to python You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink python_operator4 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/931.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #931 commit 40fd3501cacb7b382c7265f0370d0f94887b7e85 Author: zentol s.mo...@web.de Date: 2015-07-21T19:22:19Z [FLINK-1927][py] Operator distribution rework Python operators are no longer serialized and shipped across the cluster. Instead the plan file is executed on each node, followed by usage of the respective operator object. removed dill library [FLINK-2173] filepaths are always explicitly passed to python --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2565] Support primitive Arrays as keys
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1043#issuecomment-134618042 @StephanEwen I've reimplemented hashCode() and compare() accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2565] Support primitive Arrays as keys
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1043#issuecomment-134663435 @fhueske Added the test you requested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2668] Chained Projections are no longer...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1279#issuecomment-150177113 @fhueske I've made the changes you suggested --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1982] [record-api] Remove dependencies ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1294#discussion_r42808744 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java --- @@ -39,7 +38,7 @@ import org.junit.Test; @SuppressWarnings("deprecation") --- End diff -- this suppression can be removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/1306 [FLINK-2901] Remove Record API dependencies from flink-tests This PR removes all dependencies to the Record API in the flink-tests project. For this, tests were either ported or removed. The following files were NOT ported: * recordJobs/* * except kmeans/udfs CoordVector, PointInFormat (moved & ported) * except util/InfiniteIntegerInputFormat (moved & ported) * recordJobTests/* * operators/* * except CoGroupSortITCase, MapPartitionITCase, ObjectReuseITCase (moved) * accumulators/AccumulatorIterativeITCase * Unfinished test, should be addressed in a separate issue * IterationTerminationWithTwoTails * nigh identical with IterationTerminationWithTerminationTail (ported version also failed) * iterative/DeltaPageRankITCase, IterativeKMeansITCase, KMeansITCase * behaved like they belong into /recordJobTests/ (as in they simply execute a recordJob), thus removed * optimizer/iterations/IterativeKMeansITCase * overlaps with ConnectedComponentsITCase (need a second opinion on this!) * util/FailingTestBase * integrated into TaskFailureITCase (only user of the class) * testPrograms/util/tests/* cancelling/MatchJoinCancellingITCase was ported but disabled since it is unstable, and also disabled before. Separate issue seems appropriate. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 2901_record_tests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1306.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1306 commit ae3e5d64240feb0f2d9f5b74b709567ee3c66ec4 Author: Fabian Hueske <fhue...@apache.org> Date: 2015-10-22T19:13:59Z Remove ITCases for Record API operators commit 88fcd3419774a48bc42c21fc46a745129bc4c0ac Author: Fabian Hueske <fhue...@apache.org> Date: 2015-10-22T19:15:38Z Move operator ITCases into correct package commit 4b9cecbe747d1131849070223821cbb4446a2a48 Author: Fabian Hueske <fhue...@apache.org> Date: 2015-10-22T19:18:04Z Remove Record API dependencies from CC iteration tests commit 75052028dc8f29b4280d9c29d61fcb6f6c00ff0a Author: Fabian Hueske <fhue...@apache.org> Date: 2015-10-22T19:19:34Z Remove Record API dependencies from WordCount compiler test commit 91ca56c0df8a07cd142fd764b13af6ea3d7dae8a Author: Fabian Hueske <fhue...@apache.org> Date: 2015-10-22T19:20:34Z Remove Record API program tests commit 491afa1ea220f5292c36910d5c4f0c2491edc859 Author: zentol <s.mo...@web.de> Date: 2015-10-25T14:07:37Z Remove deprecation warning suppressions commit a32a0d1b33c7ffcd4a997fa1cdd11fe299bc7e60 Author: zentol <s.mo...@web.de> Date: 2015-10-27T20:22:38Z [FLINK-2901] Remove Record API dependencies from flink-tests #1 commit 0600da3154665aaf1ee02510e24f8222ca12af5f Author: zentol <s.mo...@web.de> Date: 2015-10-27T20:22:45Z [FLINK-2901] Remove Record API dependencies from flink-tests #2 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1266#discussion_r44439588 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -18,32 +18,97 @@ package org.apache.flink.api.java.io; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.flink.api.common.io.GenericCsvInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.tuple.Tuple; +import java.io.IOException; import org.apache.flink.core.fs.Path; import org.apache.flink.util.StringUtils; -public class CsvInputFormat extends CommonCsvInputFormat { +public abstract class CsvInputFormat extends GenericCsvInputFormat { private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + protected transient Object[] parsedValues; - public CsvInputFormat(Path filePath, CompositeType typeInformation) { - super(filePath, typeInformation); + protected CsvInputFormat(Path filePath) { + super(filePath); } - - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType typeInformation) { - super(filePath, lineDelimiter, fieldDelimiter, typeInformation); + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + @SuppressWarnings("unchecked") + FieldParser[] fieldParsers = (FieldParser[]) getFieldParsers(); + + //throw exception if no field parsers are available + if (fieldParsers.length == 0) { + throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); + } + + // create the value holders + this.parsedValues = new Object[fieldParsers.length]; + for (int i = 0; i < fieldParsers.length; i++) { + this.parsedValues[i] = fieldParsers[i].createValue(); + } + + // left to right evaluation makes access [0] okay + // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default + if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { + this.lineDelimiterIsLinebreak = true; + } + + this.commentCount = 0; + this.invalidLineCount = 0; } @Override - protected OUT createTuple(OUT reuse) { - Tuple result = (Tuple) reuse; - for (int i = 0; i < parsedValues.length; i++) { - result.setField(parsedValues[i], i); + public OUT nextRecord(OUT record) throws IOException { + OUT returnRecord = null; + do { + returnRecord = super.nextRecord(record); + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; + } + + public Class[] getFieldTypes() { + return super.getGenericFieldTypes(); + } + + protected static boolean[] createDefaultMask(int size) { + boolean[] includedMask = new boolean[size]; + for (int x=0; x<includedMask.length; x++) { + includedMask[x] = true; + } + return includedMask; + } + + protected static boolean[] toBooleanMask(int[] sourceFieldIndices) { --- End diff -- I see your point, but don't think it's due to this methods. It follows a similar implementation in GenericCsvInputFormat.setFieldsGeneric that was used until now. The key thing is that previously we checked the indices for a monotonous order, so the case you described couldn't occur. That check wasn't technically necessary, hence i removed it. We can either re-add that check, or add documentation to the CsvInputFormat constructor and Scala ExecutionEnvironment.readCsvFile method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1266#discussion_r44435787 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -18,32 +18,97 @@ package org.apache.flink.api.java.io; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.flink.api.common.io.GenericCsvInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.tuple.Tuple; +import java.io.IOException; import org.apache.flink.core.fs.Path; import org.apache.flink.util.StringUtils; -public class CsvInputFormat extends CommonCsvInputFormat { +public abstract class CsvInputFormat extends GenericCsvInputFormat { private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + protected transient Object[] parsedValues; - public CsvInputFormat(Path filePath, CompositeType typeInformation) { - super(filePath, typeInformation); + protected CsvInputFormat(Path filePath) { + super(filePath); } - - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType typeInformation) { - super(filePath, lineDelimiter, fieldDelimiter, typeInformation); + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + @SuppressWarnings("unchecked") + FieldParser[] fieldParsers = (FieldParser[]) getFieldParsers(); + + //throw exception if no field parsers are available + if (fieldParsers.length == 0) { + throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); + } + + // create the value holders + this.parsedValues = new Object[fieldParsers.length]; + for (int i = 0; i < fieldParsers.length; i++) { + this.parsedValues[i] = fieldParsers[i].createValue(); + } + + // left to right evaluation makes access [0] okay + // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default + if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { + this.lineDelimiterIsLinebreak = true; + } + + this.commentCount = 0; + this.invalidLineCount = 0; } @Override - protected OUT createTuple(OUT reuse) { - Tuple result = (Tuple) reuse; - for (int i = 0; i < parsedValues.length; i++) { - result.setField(parsedValues[i], i); + public OUT nextRecord(OUT record) throws IOException { + OUT returnRecord = null; + do { + returnRecord = super.nextRecord(record); + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; + } + + public Class[] getFieldTypes() { + return super.getGenericFieldTypes(); + } + + protected static boolean[] createDefaultMask(int size) { --- End diff -- I wanted to cover that case directly in the InputFormat instead of *somewhere* else. This method is used to create a mask for exactly that case, when we can infer the mask from the number of field types. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1266#discussion_r44435810 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -18,32 +18,97 @@ package org.apache.flink.api.java.io; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.flink.api.common.io.GenericCsvInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.tuple.Tuple; +import java.io.IOException; import org.apache.flink.core.fs.Path; import org.apache.flink.util.StringUtils; -public class CsvInputFormat extends CommonCsvInputFormat { +public abstract class CsvInputFormat extends GenericCsvInputFormat { private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + protected transient Object[] parsedValues; - public CsvInputFormat(Path filePath, CompositeType typeInformation) { - super(filePath, typeInformation); + protected CsvInputFormat(Path filePath) { + super(filePath); } - - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType typeInformation) { - super(filePath, lineDelimiter, fieldDelimiter, typeInformation); + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + @SuppressWarnings("unchecked") + FieldParser[] fieldParsers = (FieldParser[]) getFieldParsers(); + + //throw exception if no field parsers are available + if (fieldParsers.length == 0) { + throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); + } + + // create the value holders + this.parsedValues = new Object[fieldParsers.length]; + for (int i = 0; i < fieldParsers.length; i++) { + this.parsedValues[i] = fieldParsers[i].createValue(); + } + + // left to right evaluation makes access [0] okay + // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default + if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { + this.lineDelimiterIsLinebreak = true; + } + + this.commentCount = 0; + this.invalidLineCount = 0; } @Override - protected OUT createTuple(OUT reuse) { - Tuple result = (Tuple) reuse; - for (int i = 0; i < parsedValues.length; i++) { - result.setField(parsedValues[i], i); + public OUT nextRecord(OUT record) throws IOException { + OUT returnRecord = null; + do { + returnRecord = super.nextRecord(record); + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; + } + + public Class[] getFieldTypes() { + return super.getGenericFieldTypes(); + } + + protected static boolean[] createDefaultMask(int size) { --- End diff -- *cover it in an obvious manner --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1266#issuecomment-155764846 @fhueske I've addressed your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2441] Introduce Python OperationInfo
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/1352 [FLINK-2441] Introduce Python OperationInfo Introduces an OperationInfo object to the Python API, replacing the previously used dictionary. This is mostly a cosmetic change, making code shorter (and IMO more readable) within DataSet and ExecutionEnvironment and making the general structure more similar to the Java side. Furthermore all fields are initialized with a default value. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 2441_pyopinfo Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1352.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1352 commit ce23a695355db363c96eeec5c639b0597f802f96 Author: zentol <s.mo...@web.de> Date: 2015-07-24T18:18:47Z [FLINK-2441] Introduce Python OperationInfo Introduces an OperationInfo object, replacing the previously used dictionary. This change generally makes related code shorter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3012] Refactor boilerplate code in Data...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1355#issuecomment-156869041 I'm not too fond of the new version either and feel like this code bit doesn't really need more clarity, simply because it isn't a lot of code. Using a counter at all doesn't really make sense to me, you aren't counting anything effectively. a Boolean seems more intuitive. It is nit-picky but we are going for clarity here after all :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2914] Add missing break Statement in ZK...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/1359 [FLINK-2914] Add missing break Statement in ZKJobGraphStore You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 2914_break Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1359.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1359 commit e54198f479f8366288c432ae77de54b3683416c7 Author: zentol <ches...@apache.org> Date: 2015-11-16T10:35:28Z [FLINK-2914] Add missing break Statement in ZKJobGraphStore --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1306#issuecomment-157382048 @fhueske I've addressed most of your concerns. Things that still need work / clarification: * PreviewPlanDumpTest was previously executed with 2 different sets of arguments, now only with 1. Should this be changed back to the previous behaviour? The arguments affect paths for sources/sink, parallelism and the number of Iterations * test.classloading.jar.KMeansForTest appears to be a good replacement for the IterativeKMeansITCase, what's your take on that? * The removed delta ilteration PageRank program looks very similar to the ConnectedComponents implementation under flink-examples. I don't think this needs a separate port. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1306#discussion_r44392775 --- Diff: flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java --- @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.iterative; - -import java.io.Serializable; -import java.util.Iterator; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.io.CsvOutputFormat; -import org.apache.flink.api.java.record.io.TextInputFormat; -import org.apache.flink.api.java.record.operators.BulkIteration; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.test.util.RecordAPITestBase; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.util.Collector; -import org.junit.Assert; - -@SuppressWarnings("deprecation") -public class IterationTerminationWithTwoTails extends RecordAPITestBase { - - private static final String INPUT = "1\n" + "2\n" + "3\n" + "4\n" + "5\n"; - private static final String EXPECTED = "22\n"; - - protected String dataPath; - protected String resultPath; - - public IterationTerminationWithTwoTails(){ - setTaskManagerNumSlots(parallelism); - } - - @Override - protected void preSubmit() throws Exception { - dataPath = createTempFile("datapoints.txt", INPUT); - resultPath = getTempFilePath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(EXPECTED, resultPath); - } - - @Override - protected Plan getTestJob() { - return getTestPlanPlan(parallelism, dataPath, resultPath); - } - - private static Plan getTestPlanPlan(int numSubTasks, String input, String output) { - - FileDataSource initialInput = new FileDataSource(TextInputFormat.class, input, "input"); - - BulkIteration iteration = new BulkIteration("Loop"); - iteration.setInput(initialInput); - iteration.setMaximumNumberOfIterations(5); - Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1); - - ReduceOperator sumReduce = ReduceOperator.builder(new SumReducer()) - .input(iteration.getPartialSolution()) - .name("Compute sum (Reduce)") - .build(); - - iteration.setNextPartialSolution(sumReduce); - - MapOperator terminationMapper = MapOperator.builder(new TerminationMapper()) - .input(iteration.getPartialSolution()) --- End diff -- so THAT was the difference, will port it! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2901] Remove Record API dependencies fr...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1306#discussion_r44393132 --- Diff: flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java --- @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.iterative; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast; -import org.apache.flink.test.testdata.KMeansData; -import org.apache.flink.test.util.RecordAPITestBase; - - -public class IterativeKMeansITCase extends RecordAPITestBase { --- End diff -- does the KMeansForTest qualify for this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1266#issuecomment-152584371 We could move the differing lines (Csv:L114 Pojo:L218->L225) into separate methods that is called from a generic readRecord() method. something like fillRecord(OUT reuse, Object[] parsedValues). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2851] Move PythonAPI to flink-libraries
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/1257 [FLINK-2851] Move PythonAPI to flink-libraries You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink _move_libraries Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1257.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1257 commit f9d108973c3c01f56be557cf056f193a69728e75 Author: zentol <s.mo...@web.de> Date: 2015-10-14T14:49:31Z [FLINK-2851] Move PythonAPI to flink-libraries --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2692] Untangle CsvInputFormat
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/1266 [FLINK-2692] Untangle CsvInputFormat This PR splits the CsvInputFormat into a Tuple and POJO Version. To this end, The (Common)CsvInputFormat classes were merged, and the type specific portions refactored into separate classes. Additionally, the ScalaCsvInputFormat has been removed; Java and Scala API now use the same InputFormats. Previously, the formats differed in the way they created the output tuples; this is now realized in a newly introduced abstract method "createOrReuseInstance(Object[] fieldValues, T reuse)" within the TupleSerializerBase. Fields to include and field names are no longer passed via setters, but instead via the contructor. Several new contructors were added to accommodate different use cases, along with 2 new static methods to generate a default include mask, or convert an indice int[] list to a boolean include mask. Classes no longer have to be passed separately, as they are extracted from the typeinformation object. A few sanity checks were moved from the ExecEnvironment to the InputFormat. The testReadSparseWithShuffledPositions Test was removed since monotonous order of field indices is, and afaik was, not actually necessary due to the way it was converted to a boolean[]. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 2692_csv Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1266.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1266 commit d497415adc2e58b4e9912ae89a53444825416366 Author: zentol <s.mo...@web.de> Date: 2015-10-18T18:23:23Z [FLINK-2692] Untangle CsvInputFormat --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---