[GitHub] flink pull request: [FLINK-1330] [build] Build creates a link in t...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/333#discussion_r23527422 --- Diff: flink-dist/pom.xml --- @@ -436,6 +436,37 @@ under the License. /gitDescribe /configuration /plugin + + !-- create a symbolic link to the build target in the root directory -- + plugin + groupIdcom.pyx4j/groupId + artifactIdmaven-junction-plugin/artifactId + version1.0.3/version + executions + execution + phasepackage/phase + goals + goallink/goal + /goals + /execution + execution + idunlink/id + phaseclean/phase + goals + goalunlink/goal + /goals + /execution + /executions + configuration + links + link + dst${basedir}/../build-target/dst --- End diff -- Isn't based deprecated? In the next line you use project.basedir. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1395] Add support for JodaTime in KryoS...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/304 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1396][FLINK-1303] Hadoop Input/Output d...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/363 [FLINK-1396][FLINK-1303] Hadoop Input/Output directly in API This adds methods on ExecutionEnvironment for reading with Hadoop Input/OutputFormat. This also adds support in the Scala API for Hadoop Input/OutputFormats. I also added tests and updated the documentation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink hadoop-in-api Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/363.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #363 commit 94376ce914c740e9880bf161e90ae92a0ced39ed Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2015-01-28T14:13:30Z [FLINK-1396][FLINK-1303] Hadoop Input/Output directly in API This adds methods on ExecutionEnvironment for reading with Hadoop Input/OutputFormat. This also adds support in the Scala API for Hadoop Input/OutputFormats. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1463] Fix stateful/stateless Serializer...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/353 [FLINK-1463] Fix stateful/stateless Serializers and Comparators Before, Serializers would announce whether they are stateful or not and rely on RuntimeStatefulSerializerFactory to do the duplication. Comparators, on the other hand, had a duplicate method that the user was required to call. This commit removes the statful/stateless property from Serializers but instead introduces a duplicate() method, similar to Comparators, that can return the same instance. The two serializer factories are merged into one that always calls duplicate() before returning a serializer. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink serializer-factories-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/353.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #353 commit 91834fee239b372b9a39f3c8f89ecbe42e2ae23a Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2015-01-30T15:43:31Z [FLINK-1463] Fix stateful/stateless Serializers and Comparators Before, Serializers would announce whether they are stateful or not and rely on RuntimeStatefulSerializerFactory to do the duplication. Comparators, on the other hand, had a duplicate method that the user was required to call. This commit removes the statful/stateless property from Serializers but instead introduces a duplicate() method, similar to Comparators, that can return the same instance. The two serializer factories are merged into one that always calls duplicate() before returning a serializer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Add support for Subclasses, Interfaces, Abstra...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/236#issuecomment-72433217 Yes, it is not a good solution But what you propose isn't either: If we use Kryo for those subclasses that we cannot handle then nothing works anymore. The whole reason we have support for POJOs is that we can theoretically compare them in their binary representation. We are not doing this right now (the PojoComparator is always comparing in deserialised form) but we added it with that goal in mind. If we don't want to do that anymore we can just get rid of the whole POJO serialisation code and use Kryo for everything. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1458] Allow Interfaces and abstract typ...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/357 [FLINK-1458] Allow Interfaces and abstract types in TypeExtractor Kryo already supports them, so it was just a question of the TypeExtractor allowing them. I also added tests for this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink interfaces-generic-type-info-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/357.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #357 commit 6b1733178592c5c47145db165eb9c6797b156e19 Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2015-02-02T15:08:18Z [FLINK-1458] Allow Interfaces and abstract types in TypeExtractor Kryo already supports them, so it was just a question of the TypeExtractor allowing them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1458] Allow Interfaces and abstract typ...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/357#issuecomment-72642370 You're right. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1396][FLINK-1303] Hadoop Input/Output d...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/363#issuecomment-73040767 I think if executing it in an IDE the dependencies are not there. Since flink-java does not depend on flink-runtime, which has the hadoop dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1463] Fix stateful/stateless Serializer...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/353#issuecomment-73077279 Do you think that with the additional checking logic this would really make up for one superfluous duplication? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1463] Fix stateful/stateless Serializer...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/353#issuecomment-73079161 Ok, then I'll add this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1458] Allow Interfaces and abstract typ...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/357 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1396][FLINK-1303] Hadoop Input/Output d...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/363#issuecomment-73020223 I addressed the comments. What do the others think about overloading readFile()? I made it like this on purpose. So that the user sees in the API that they are using Hadoop input formats or that they can be used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Allow KeySelectors to implement ResultTypeQuer...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/354#issuecomment-72752435 Nope, sorry, also have no Idea why this is happening. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1395] Add support for JodaTime in KryoS...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/304#discussion_r22968293 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java --- @@ -99,6 +104,7 @@ public void testCopy() { for (T datum : testData) { T copy = serializer.copy(datum); + String str = copy.toString(); --- End diff -- Will change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Add support for Subclasses, Interfaces, Abstra...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/236#issuecomment-70216890 No objections, your honour. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1395] Add support for JodaTime in KryoS...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/304#discussion_r22999053 --- Diff: flink-java/pom.xml --- @@ -64,6 +64,18 @@ under the License. version0.5.1/version /dependency + dependency --- End diff -- They are actually optional dependencies. They are not included unless we explicitly include them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1399] Add support for registering Seria...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/305#issuecomment-70065763 I added register methods at the ExecutionEnvironment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1395] Add support for JodaTime in KryoS...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/304#issuecomment-70063665 I added to LICENSE AND NOTICE and also addressed the other issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-947] Add a declarative expression API
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/405#issuecomment-74842606 Yeah, I'm not sure about linq as well. I like the name but realise that it might be problematic. What do the others think. I could call it flink-expressions. I will add documentation about which types are supported and a good error message for unsupported types as @rmetzger mentioned. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1417] Automatically register types with...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/393#issuecomment-74843783 Yes please, go ahead. But if our Pojo stuff is really that slow we should think about how to improve that or remove it alltogether. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1417] Automatically register types with...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/393#issuecomment-74744132 What exactly are you running? TPC-H Query 3? Maybe we should test how fast Kryo would be with the PojoComparator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1417] Automatically register types with...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/393#discussion_r24749284 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java --- @@ -292,7 +360,76 @@ public void registerKryoType(Class? type) { /** * Returns the registered POJO types. */ - public SetClass? getRegisteredPojoTypes() { + public ListClass? getRegisteredPojoTypes() { return registeredPojoTypes; } + + + public boolean isDisableAutoTypeRegistration() { + return disableAutoTypeRegistration; + } + + /** +* Control whether Flink is automatically registering all types in the user programs with +* Kryo. +* +* @param disableAutoTypeRegistration +*/ + public void setDisableAutoTypeRegistration(boolean disableAutoTypeRegistration) { --- End diff -- I would prefer disableAutoTypeRegistration here. And then isAutoTypeRegistrationDisabled, above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-947] Add a declarative expression API
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/405 [FLINK-947] Add a declarative expression API This one is quite big. So you should check out the documentation, skaldic, examples and test cases to see how the API works. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink linq Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #405 commit 147525ced43db6690a64fbae1395dbd258b8901d Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2014-10-03T16:25:15Z Change translateToDataflow to return Operator Before, translateToDataflow of SingleInputOperator could only return a single input operator of the lower layer, same for TwoInputOperator. This change allows translateToDataflow to return more kinds of operators. commit 58b5b9ec6e65855bfd71287deb6352dfc4498451 Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2014-10-23T16:09:38Z Add methods to CompositeType for querying field types and names commit ac29ee3ad36a72d7c41549f38da1a00e66d85041 Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2014-10-01T11:12:18Z [FLINK-947] Add a declarative expression API --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1422] Add withParameters() to documenta...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/350#issuecomment-72039131 +1 looks good to me --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1460] fix typos
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/346#issuecomment-72036147 +1, can you merge it @hsaputra or should I? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Add support for Subclasses, Interfaces, Abstra...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/236#issuecomment-71436404 I will have to rework this now that the support for registering Types and Serializers at Kryo was merged. The POJO subclass with tagging is slower because we do additional checks and lookups: Upon serialisation we perform a map lookup to check whether the subclass is actually a registered class. When deserialising we have to fetch the correct subclass serialiser from an array of subclass serialisers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1369] [types] Add support for Subclasse...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/316#issuecomment-71445361 It's almost the same, except for the change to handle Interfaces and Abstract Classes with GenericTypeInfo, correct? The part that changes the KryoSerializer must be adapted because of my recently merged PR that allows registering types and serializers at Kryo. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Add support for Subclasses, Interfaces, Abstra...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/236#discussion_r24403182 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java --- @@ -169,4 +186,113 @@ public ExecutionConfig disableObjectReuse() { public boolean isObjectReuseEnabled() { return objectReuse; } + + // + // Registry for types and serializers + // + + /** +* Registers the given Serializer as a default serializer for the given type at the --- End diff -- yes :dancers: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1463] Fix stateful/stateless Serializer...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/353#issuecomment-73501310 Manually merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a GroupC...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/466#issuecomment-82886862 I would say it's good to go now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Make Expression API available to Java, Rename ...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/503#issuecomment-83969916 Correct, that's why I'm doing a Pull Request. People can chime in here if they want. Or should we continue the discussion on the mailing list? I though everyone would be more or less happy with Table. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/466#issuecomment-78101926 Sorry, I completely blanked, of course, You still need the grouping, only the shuffle step you don't need. So, I suggest only better tests, using a combination of partitionByHash() and groupReducePartial(). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/466#discussion_r26105008 --- Diff: flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReducePartialProperties.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler.operators; + +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.compiler.dag.SingleInputNode; +import org.apache.flink.compiler.dataproperties.GlobalProperties; +import org.apache.flink.compiler.dataproperties.LocalProperties; +import org.apache.flink.compiler.dataproperties.PartitioningProperty; +import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties; +import org.apache.flink.compiler.dataproperties.RequestedLocalProperties; +import org.apache.flink.compiler.plan.Channel; +import org.apache.flink.compiler.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import java.util.Collections; +import java.util.List; + +public final class GroupReducePartialProperties extends OperatorDescriptorSingle { + + private final Ordering ordering;// ordering that we need to use if an additional ordering is requested + + public GroupReducePartialProperties(FieldSet groupKeys, Ordering additionalOrderKeys) { + super(groupKeys); + + // if we have an additional ordering, construct the ordering to have primarily the grouping fields + + this.ordering = new Ordering(); + for (Integer key : this.keyList) { + this.ordering.appendOrdering(key, null, Order.ANY); + } + + // and next the additional order fields --- End diff -- indentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/466#discussion_r26105136 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java --- @@ -156,6 +156,23 @@ public SortedGrouping(DataSetT set, KeysT keys, String field, Order order) { return new GroupReduceOperatorT, R(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName() ); } + /** +* Applies a partial GroupReduce transformation on a grouped and sorted {@link DataSet}. +* +* In contrast to the reduceGroup transformation, the GroupReduce function is only called on each partition. Thus, +* partial solutions are likely to occur. +* @param reducer The ReduceFunction that is applied on the DataSet. +* @return A GroupReducePartial operator which represents the partially reduced DataSet. +*/ + public R GroupReducePartialOperatorT, R reduceGroupPartially(GroupReduceFunctionT, R reducer) { + if (reducer == null) { + throw new NullPointerException(GroupReduce function must not be null.); + } + TypeInformationR resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType()); + + return new GroupReducePartialOperatorT, R(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName()); + } + --- End diff -- Why do we have this here. Partial GroupReduce doesn't make sense on a grouping. Thats's what a regular GroupReduce is. Or am I missing something? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/466#discussion_r26105155 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java --- @@ -159,7 +159,23 @@ public UnsortedGrouping(DataSetT set, KeysT keys) { return new GroupReduceOperatorT, R(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName()); } - + + /** +* Applies a partial GroupReduce transformation on a grouped {@link DataSet}. +* In contrast to the reduceGroup transformation, the GroupReduce function is only called on each partition. Thus, +* partial solutions are likely to occur. +* @param reducer The ReduceFunction that is applied on the DataSet. +* @return A GroupReducePartial operator which represents the partially reduced DataSet +*/ + public R GroupReducePartialOperatorT, R reduceGroupPartially(GroupReduceFunctionT, R reducer) { + if (reducer == null) { + throw new NullPointerException(GroupReduce function must not be null.); + } + TypeInformationR resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType()); + + return new GroupReducePartialOperatorT, R(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName()); + } + --- End diff -- Why do we have this here. Partial GroupReduce doesn't make sense on a grouping. Thats's what a regular GroupReduce is. Or am I missing something? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/466#discussion_r26105226 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala --- @@ -355,6 +355,63 @@ class GroupedDataSet[T: ClassTag]( } /** + * Partial variant of the reduceGroup transformation which operates only on the individual + * partitions. This may lead to partially reduced results. + * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list + * of elements to the group reduce function. The function must output one element. The + * concatenation of those will form the resulting [[DataSet]]. + */ + def reduceGroupPartially[R: TypeInformation: ClassTag]( + fun: (Iterator[T]) = R): DataSet[R] = { +Validate.notNull(fun, Group reduce function must not be null.) +val reducer = new GroupReduceFunction[T, R] { + val cleanFun = set.clean(fun) + def reduce(in: java.lang.Iterable[T], out: Collector[R]) { +out.collect(cleanFun(in.iterator().asScala)) + } +} +wrap( + new GroupReducePartialOperator[T, R](maybeCreateSortedGrouping(), +implicitly[TypeInformation[R]], reducer, getCallLocationName())) + } + + /** + * Partial variant of the reduceGroup transformation which operates only on the individual + * partitions. This may lead to partially reduced results. + * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list + * of elements to the group reduce function. The function can output zero or more elements using + * the [[Collector]]. The concatenation of the emitted values will form the resulting [[DataSet]]. + */ + def reduceGroupPartially[R: TypeInformation: ClassTag]( + fun: (Iterator[T], Collector[R]) = Unit): DataSet[R] = { +Validate.notNull(fun, Group reduce function must not be null.) +val reducer = new GroupReduceFunction[T, R] { + val cleanFun = set.clean(fun) + def reduce(in: java.lang.Iterable[T], out: Collector[R]) { +cleanFun(in.iterator().asScala, out) + } +} +wrap( + new GroupReducePartialOperator[T, R](maybeCreateSortedGrouping(), +implicitly[TypeInformation[R]], reducer, getCallLocationName())) + } + + /** + * Partial variant of the reduceGroup transformation which operates only on the individual + * partitions. This may lead to partially reduced results. + * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list + * of elements to the [[GroupReduceFunction]]. The function can output zero or more elements. The + * concatenation of the emitted values will form the resulting [[DataSet]]. + */ + def reduceGroupPartially[R: TypeInformation: ClassTag]( + reducer: GroupReduceFunction[T, R]): DataSet[R] = { +Validate.notNull(reducer, GroupReduce function must not be null.) +wrap( + new GroupReducePartialOperator[T, R](maybeCreateSortedGrouping(), +implicitly[TypeInformation[R]], reducer, getCallLocationName())) + } + + /** --- End diff -- Again, why partial reduce on grouped dataset. That's what the regular GroupReduce is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1622][java-api][scala-api] add a partia...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/466#issuecomment-78016698 I like the implementation, except for my comments on groupReducePartial() on grouped DataSets. Also, the tests seem a bit shady because of all the grouping and regular reduceGroup operations. I would suggest partitioning the data using a manual partition operation and then applying a GroupReducePartial. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Make Expression API available to Java, Rename ...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/503#issuecomment-86112294 I fixed @rmetzger's remarks. Still waiting for a solution to the naming issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1769] Fix deploy bug caused by ScalaDoc...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/535#issuecomment-86416475 I just wanted to leave it sitting here for a while. But if no-one has any reservations I'll merge it today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1788] [table] Make logical plans transf...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/550 [FLINK-1788] [table] Make logical plans transformable You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink flinq-mutable-plans Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/550.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #550 commit 6b9d8e7e76a563534927451c8c61707d71f51cca Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2015-03-26T13:45:43Z [FLINK-1788] [table] Make logical plans transformable --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-76973194 Yes, this sounds good? Another thing: it has probably already come up but I just want to make sure, you implement CoGroup and Reduce the way you do because of performance, correct? That is, you don't do any work in the user code of a ReduceOperator but you do it in a chained MapPartition because there you get all the elements which makes communication with the python process more efficient. Same with CoGroup, where you implement your own grouping logic in python from the raw input streams. Overall I like the architecture, the communication between the host and the guest language is well abstracted and I can see this being reused for other languages. Could you rename the CoGroupPython* classes to something more generic? Because they really are a part of the generic language binding stuff and not specific to python, correct? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-76981299 You could call it CoGroupRaw, just an idea... Once that and the split into the python and generic part is done I vote for merging this. The API looks good and other stuff, such as getting rid of the type annotations can be worked on afterwards. I think it would be good to get people that are interested to try it out. Also, the code is very well commented and documented. :smile_cat: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-76738603 I'm the next person to be looking at this. Hopefully wan can merge it after I've looked at it. :smile: @zentol Do you want to keep in in the current location or do you want to move it to flink-python? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-77127742 Thanks, I have two last requests, sorry for that. Could you rename flink-generic to flink-language-binding-generic? The problem is, that the package name is now flink-generic, it pops up like this in maven central and so on without the information that it is actually a sub package of flink-language-binding. This could be quite confusing. In MapFunctin.py and FilterFunction.py you use map() and filter() respectively. These operations are not lazy, i.e. in map() it does first apply the user map-function to every element in the partition and then it collects the results. This can become a problem if the input is very big. Instead we should iterate over the iterator and output each element after mapping. This keeps memory consumption low. Same applies to filter(). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1769] Fix deploy bug caused by ScalaDoc...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/535 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Add support for Subclasses, Interfaces, Abstra...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/236#issuecomment-72161285 Then it fails at runtime, which makes me very uneasy. But then again, stuff can always fail at runtime when the user uses some strange subclass. Even more so without this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Stream graph + internal refactor
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/594#issuecomment-92854325 Regarding StreamRecord, is the UID still required? In my understanding we are working towards state-snapshotting, so that would not require to have IDs in records anymore, correct? I mentioned at some earlier point that the IDs are not used anymore, so what's the status now? If we got rid of the UID then we can also get rid of StreamRecord and StreamRecord serializer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Stream graph + internal refactor
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/594#issuecomment-92856455 By the way, all these comments I'm making. I'm just making them to keep track of things. If we discuss them and want to implement some changes I can also do this myself, just want to get opinions here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94443054 I just ran it on the cluster. Works like a charm. :smile: For word count, python takes 12 minutes, java about 2:40. But this should be expected, I guess. Good to merge now, in my opinion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94402978 I was referring to the way that communication is handled between the java host and the generic language client: Communication between them is not based on a fixed set of Messages (for example, messages defined using something like Protobuf or Avro) but instead the knowledge about how messages are structured is implicit in the code that does the messaging. So the java side expects a sequence of primitives (integers, strings) in a certain order and the python side knows that order and sends them in this order. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94400959 I'll test it again on a cluster. Could you please elaborate a bit. Is the timeout still in? Communication is through TCP instead of the mapped files. but still with the same basic interface of writing basic values for communication? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1799][scala] Fix handling of generic ar...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/582#issuecomment-94392757 Any more thoughts? Otherwise I would like to merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1799][scala] Fix handling of generic ar...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/582#discussion_r28692599 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java --- @@ -143,6 +143,18 @@ else if (type instanceof Class? ((Class?) type).isArray() throw new InvalidTypesException(The given type is not a valid object array.); } + /** +* Creates a new {@link org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo} from a +* {@link TypeInformation} for the component type. +* +* p +* This must be used in cases where the complete type of the array is not available as a +* {@link java.lang.reflect.Type} or {@link java.lang.Class}. +*/ + public static T, C ObjectArrayTypeInfoT, C getInfoFor(TypeInformationC componentInfo) { + return new ObjectArrayTypeInfoT, C(Object[].class, componentInfo.getTypeClass(), componentInfo); --- End diff -- That's right, I didn't think of that. Will change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/612 [FLINK-1867/1880] Raise test timeouts in hope of fixing Travis fails You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink raise-test-timeouts Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/612.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #612 commit 4df27ee0ec1ce68376d51c4a882116651fd52788 Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2015-04-13T14:16:51Z [FLINK-1867/1880] Raise test timeouts in hope of fixing Travis fails --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/612#issuecomment-94799019 Any thoughts on this? I would really like to merge this to improve Travis reliability. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-377] [FLINK-671] Generic Interface / PA...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/202#issuecomment-94798439 I merged it. :smile: Thanks a lot @zentol for staying with this for so long. Great work! P.S. Could you please close this PR, I always forget adding the closes #... message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1799][scala] Fix handling of generic ar...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/582#issuecomment-94799103 So, any thoughts about merging this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/442#issuecomment-95846017 The problem is, that I can't see it in the github interface. On what branch are your changes? Could you please rebase them on top of the current master? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1789] [core] [runtime] [java-api] Allow...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/593#issuecomment-95845500 OK, @StephanEwen, any thoughts on this? Should we allow that the local user code class loader in the client potentially doesn't have the same jars available as the workers? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/612#issuecomment-95946066 Manually merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/612 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/621#issuecomment-95926348 The build still fails because of missing license headers in the model package. By the way, did you write the files in the model package yourself or were they generated? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/442#issuecomment-95570894 Where is your git repository? So that I can checkout your commit and merge it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/612#discussion_r28943704 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala --- @@ -18,24 +18,23 @@ package org.apache.flink.api.scala.runtime.jobmanager -import akka.actor.Status.{Success, Failure} +import akka.actor.Status.Success import akka.actor.{ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} +import org.junit.Ignore +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.client.JobExecutionException -import org.apache.flink.runtime.jobgraph.{JobGraph, AbstractJobVertex} -import org.apache.flink.runtime.jobmanager.Tasks.{NoOpInvokable, BlockingNoOpInvokable} +import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, JobGraph} +import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable} import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect -import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated, -NotifyWhenJobManagerTerminated} +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated, NotifyWhenJobManagerTerminated} import org.apache.flink.runtime.testingUtils.TestingUtils import org.apache.flink.test.util.ForkableFlinkMiniCluster -import org.junit.Ignore -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} @Ignore(Contains a bug with Akka 2.2.1) --- End diff -- True, removing it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/612#discussion_r28943849 --- Diff: flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java --- @@ -112,9 +112,9 @@ public void testTaskManagerProcessFailure() { Tuple2String, Object localAddress = new Tuple2String, Object(localhost, jobManagerPort); Configuration jmConfig = new Configuration(); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 s); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4 s); - jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 ms); --- End diff -- Seemed to run flawlessly, though. :smile: I can the tests about 100 times by now without seeing another failure in the targeted tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/612#discussion_r28943875 --- Diff: flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java --- @@ -112,9 +112,9 @@ public void testTaskManagerProcessFailure() { Tuple2String, Object localAddress = new Tuple2String, Object(localhost, jobManagerPort); Configuration jmConfig = new Configuration(); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 s); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4 s); - jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 ms); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 20 s); + jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 20); --- End diff -- See above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/612#discussion_r28943824 --- Diff: flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java --- @@ -112,9 +112,9 @@ public void testTaskManagerProcessFailure() { Tuple2String, Object localAddress = new Tuple2String, Object(localhost, jobManagerPort); Configuration jmConfig = new Configuration(); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 s); - jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4 s); - jmConfig.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2); + jmConfig.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1 ms); --- End diff -- That's a typo, will fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/612#discussion_r28943731 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala --- @@ -136,9 +135,9 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers) config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1000 ms) -config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4000 ms) +config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 20 s) --- End diff -- As above, it doesn't affect test execution time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1789] [core] [runtime] [java-api] Allow...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/593#issuecomment-95500680 This does not work if the user uses classes that are not available on the local machine since you don't add the additional class path entries in JobWithJars.buildUserCodeClassLoader(). Correct? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/442#issuecomment-95511047 This looks good to merge. Any objections? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/612#discussion_r28943598 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala --- @@ -231,9 +231,9 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers) config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1000 ms) -config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4000 ms) +config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 20 s) --- End diff -- No, it doesn't before and after this change the 4 tests complete in about 8 seconds. @tillrohrmann suggested that, since the actor system is local, there are some other mechanisms in play that signal failure in this case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1472] Fixed Web frontend config overvie...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/439#issuecomment-95517673 Hi, sorry for the long wait on this. I really like the feature but the implementation is not scalable: If new config values are added this needs to be updated in several places now. Could you change ConfigConstants and add a static initializer block that builds the hash maps that you manually build in DefaultConfigKeyValues using reflection. The code would just need to loop through all fields that have _KEY at the end, and then find the matching default value without the _KEY at the end. From the default value field the type of the value can be determined and it can be added to the appropriate hash map. This way, the defaults will always stay up to date with the actual config constants. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1789] [core] [runtime] [java-api] Allow...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/593#issuecomment-95564844 Yes, this is true, but the way it is implemented, the folders are not always added to the class loader. Maybe I'm wrong here, but JobWithJars.getUserCodeClassLoader and JobWithJars.buildUserCodeClassLoader don't add the URLs to the ClassLoader that they create. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1398] Introduce extractSingleField() in...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/308#issuecomment-95566780 Yes, I think we should start a discussion there. I just wanted to give the reasons for my opinion here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1472] Fixed Web frontend config overvie...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/439#issuecomment-95580562 Yes, but then we should change this now and not build more code on top of this that can fail in the future if someone forgets to add the names to the correct hash set in some other class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1472] Fixed Web frontend config overvie...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/439#issuecomment-95581307 I added a Jira for this: https://issues.apache.org/jira/browse/FLINK-1936 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-924] Add automatic dependency retrieval...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/35#issuecomment-95590430 Hi @qmlmoon, sorry for the long wait on this PR. Could you please rebase on top of the current master and also get rid of the merge commits in the process? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1867/1880] Raise test timeouts in hope ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/612#discussion_r28961799 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala --- @@ -231,9 +231,9 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskmanagers) config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, 1000 ms) -config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 4000 ms) +config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 20 s) --- End diff -- Yes, removing them also from JobManagerFailsITCase --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1472] Fixed Web frontend config overvie...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/439#issuecomment-95561718 But then I think the solution is to normalise the constants in ConfigConstants. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1799][scala] Fix handling of generic ar...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/582#issuecomment-94483743 I fixed @StephanEwen's complaint. It was incorrect but the type class of TypeInformation does not seem to be used in any places where it matters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1927] [py] Operator distribution rework
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/638#issuecomment-97412959 But doesn't this mean that the lambdas now must be stateless, i.e. if a user refers to some variable outside the lambda this will not be serialised with the closure anymore (because there is no serialization of the closure anymore). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/621#issuecomment-97401561 Our travis builds are a bit unstable right now. I'm running some last tests and then I'll merge this. Thanks for staying with this and working on my requests! :smile: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1924] Minor Refactoring
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/616#issuecomment-96573887 +1, can you merge it @mxm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/621#issuecomment-96565454 The tests are failing because you use spaces in you code for indentation. Could you please change all indentation to tabs to satisfy the style checker? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1789] [core] [runtime] [java-api] Allow...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/593#issuecomment-96532907 Yes, this would make things a lot cleaner. @twalthr what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-924] Add automatic dependency retrieval...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/35#issuecomment-96960370 Thanks for working with this on me. Very nice contribution. :smile: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [streaming] New Source and state checkpointing...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/643#issuecomment-98690522 Also, the key would be a property of a DataStream, and other operations could also use this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1615] [java api] SimpleTweetInputFormat
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/621#issuecomment-96953776 Unfortunately, Travis cuts of the log if it is too long, like here: https://travis-ci.org/aljoscha/flink/jobs/60177866 (that's from your pull request). You have to click on the Download log button, then you can view the whole log, there you see the check style errors. You can also see the errors if you run a mvn clean verify on your local machine. It seems your code still contains tabs. (From the recently failed travis builds) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1978] Fix POJO deserialization copy f...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/655#issuecomment-99757603 Looks good. Can you go ahead and merge it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/659 [FLINK-1977] Rework Stream Operators to always be push based You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink event-time Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/659.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #659 commit 2e2abbd4afb6f058abb7b5d4a287349cdef5d8f6 Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2015-05-04T13:53:36Z [FLINK-1977] Rework Stream Operators to always be push based commit 58843949102c0506b30c80cfb36065d303a64cda Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2015-05-06T16:11:52Z Change StreamOperator to be an Interface This introduces new AbstractOperator classes for UDF operators and operators without UDFs. commit 4e0fadd4c4069817362382dbe8c7287f3b0150ff Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2015-05-07T07:39:54Z Change StreamOperator to take Output instead of Collector Output is a richer Interface that extends Collector. Right now it does not contain additional methods. But this will be extended to support forwarding of Low Watermarks/Barriers. commit f0d2c3bb47a63ed50f543732665a8b88872d839d Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2015-05-07T09:06:52Z Simplify StreamTask and derived classes commit a4be5138c6262e104d83263d6e4800e416d6fd4a Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2015-05-07T09:19:44Z Remove unused imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [hotfix][scala] Let type analysis work on some...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/660 [hotfix][scala] Let type analysis work on some Java types This is mostly to allow Vertex, which is a Java Tuple2 to be used in the Scala Gelly API. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink scala-type-analysis-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/660.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #660 commit 6c382dccc6ddc825f348aa9dc91bf64d857f400e Author: Aljoscha Krettek aljoscha.kret...@gmail.com Date: 2015-05-07T12:27:33Z [hotfix][scala] Let type analysis work on some Java types This is mostly to allow Vertex, which is a Java Tuple2 to be used in the Scala Gelly API. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/659#issuecomment-100144884 I think this would not solve our problems. I will start a discussion thread on the dev list. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/659#discussion_r29851342 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java --- @@ -64,6 +64,10 @@ private volatile boolean isRunning = false; --- End diff -- Correct, removing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/659#discussion_r29851632 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java --- @@ -36,9 +36,9 @@ /** * Main work method of the source. This function is invoked at the beginning of the --- End diff -- Oversight on my part, and I forgot to rework the FileMonitoringFunction (a source) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/659#discussion_r29854565 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java --- @@ -1,149 +1,149 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.flume; - -import java.util.List; - -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.connectors.ConnectorSource; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.util.Collector; -import org.apache.flume.Context; -import org.apache.flume.channel.ChannelProcessor; -import org.apache.flume.source.AvroSource; -import org.apache.flume.source.avro.AvroFlumeEvent; -import org.apache.flume.source.avro.Status; - -public class FlumeSourceOUT extends ConnectorSourceOUT { - private static final long serialVersionUID = 1L; - - String host; - String port; - volatile boolean finished = false; - - private volatile boolean isRunning = false; - - FlumeSource(String host, int port, DeserializationSchemaOUT deserializationSchema) { - super(deserializationSchema); - this.host = host; - this.port = Integer.toString(port); - } - - public class MyAvroSource extends AvroSource { - CollectorOUT collector; - - /** -* Sends the AvroFlumeEvent from it's argument list to the Apache Flink -* {@link DataStream}. -* -* @param avroEvent -*The event that should be sent to the dataStream -* @return A {@link Status}.OK message if sending the event was -* successful. -*/ - @Override - public Status append(AvroFlumeEvent avroEvent) { - collect(avroEvent); - return Status.OK; - } - - /** -* Sends the AvroFlumeEvents from it's argument list to the Apache Flink -* {@link DataStream}. -* -* @param events -*The events that is sent to the dataStream -* @return A Status.OK message if sending the events was successful. -*/ - @Override - public Status appendBatch(ListAvroFlumeEvent events) { - for (AvroFlumeEvent avroEvent : events) { - collect(avroEvent); - } - - return Status.OK; - } - - /** -* Deserializes the AvroFlumeEvent before sending it to the Apache Flink -* {@link DataStream}. -* -* @param avroEvent -*The event that is sent to the dataStream -*/ - private void collect(AvroFlumeEvent avroEvent) { - byte[] b = avroEvent.getBody().array(); - OUT out = FlumeSource.this.schema.deserialize(b); - - if (schema.isEndOfStream(out)) { - FlumeSource.this.finished = true; - this.stop(); - FlumeSource.this.notifyAll(); - } else { - collector.collect(out); - } - - } - - } - - MyAvroSource avroSource; - - /** -* Configures the AvroSource. Also sets the collector so the application can -* use it from outside of the invoke function. -* -* @param collector -*The collector used in the invoke function -*/ - public void configureAvroSource(CollectorOUT
[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/659#discussion_r29855036 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java --- @@ -183,6 +173,16 @@ public void open(org.apache.flink.configuration.Configuration parameters) throws } } + @Override + public void close() throws Exception { + super.close(); + if (activePolicyThread != null) { + activePolicyThread.interrupt(); + } + + emitWindow(); --- End diff -- I think it's fine in streaming. open(), receiveElement(), close() are all called in the inner loop of StreamTask.invoke() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/659#discussion_r29855250 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.io.IndexedReaderIterator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; +import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class OneInputStreamTaskIN, OUT extends StreamTaskOUT, OneInputStreamOperatorIN, OUT { + + private static final Logger LOG = LoggerFactory.getLogger(OneInputStreamTask.class); + + protected StreamRecordSerializerIN inSerializer; + protected IndexedReaderIteratorStreamRecordIN recordIterator; + + + @Override + public void registerInputOutput() { + super.registerInputOutput(); + InputHandlerIN inputHandler = new InputHandlerIN(this); + inSerializer = inputHandler.getInputSerializer(); + recordIterator = inputHandler.getInputIter(); + } + + /* +* Reads the next record from the reader iterator and stores it in the +* nextRecord variable +*/ + protected StreamRecordIN readNext() throws IOException { + StreamRecordIN nextRecord = inSerializer.createInstance(); + try { + return recordIterator.next(nextRecord); + } catch (IOException e) { + if (isRunning) { + throw new RuntimeException(Could not read next record due to: + + StringUtils.stringifyException(e)); + } else { + // Task already cancelled do nothing + return null; + } + } catch (IllegalStateException e) { + if (isRunning) { + throw new RuntimeException(Could not read next record due to: + + StringUtils.stringifyException(e)); + } else { + // Task already cancelled do nothing + return null; + } + } + } + + @Override + public void invoke() throws Exception { + this.isRunning = true; + + boolean operatorOpen = false; + + if (LOG.isDebugEnabled()) { + LOG.debug(Task {} invoked, getName()); + } + + try { + openOperator(); + operatorOpen = true; + + StreamRecordIN nextRecord; + while (isRunning (nextRecord = readNext()) != null) { + streamOperator.receiveElement(nextRecord.getObject()); + } + + closeOperator(); + operatorOpen = false; + + if (LOG.isDebugEnabled()) { + LOG.debug(Task {} invocation finished, getName()); + } + + } catch (Exception e) { + + if (operatorOpen) { + try { + closeOperator(); + } catch (Throwable t) { + // TODO: why are we not doing anything here? --- End diff -- Seems reasonable. --- If your project is set up
[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/659#discussion_r29854531 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java --- @@ -165,11 +165,10 @@ private void decorateNode(Integer vertexID, JSONObject node) throws JSONExceptio node.put(PACT, Data Stream); } - StreamOperator?, ? operator = streamGraph.getStreamNode(vertexID).getOperator(); + StreamOperator? operator = streamGraph.getStreamNode(vertexID).getOperator(); - if (operator != null operator.getUserFunction() != null) { - node.put(CONTENTS, vertex.getOperatorName() + at - + operator.getUserFunction().getClass().getSimpleName()); + if (operator != null) { + node.put(CONTENTS, vertex.getOperatorName()); } else { node.put(CONTENTS, vertex.getOperatorName()); } --- End diff -- Changing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/659#discussion_r29854951 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java --- @@ -60,29 +62,45 @@ public void run() throws Exception { groupedDiscretizers.put(key, groupDiscretizer); } - groupDiscretizer.processRealElement(nextObject); + groupDiscretizer.processRealElement(element); } - } - for (StreamDiscretizerIN group : groupedDiscretizers.values()) { - group.emitWindow(); - } + } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { super.open(parameters); - centralThread = new Thread(new CentralCheck()); + centralCheck = new CentralCheck(); + centralThread = new Thread(centralCheck); centralThread.start(); } + @Override + public void close() throws Exception { + super.close(); + for (StreamDiscretizerIN group : groupedDiscretizers.values()) { + group.emitWindow(); + } + + try { + centralCheck.running = false; + centralThread.interrupt(); + centralThread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); --- End diff -- But what do you want me to do? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1977] Rework Stream Operators to always...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/659#discussion_r29855135 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.io.IndexedReaderIterator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; +import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class OneInputStreamTaskIN, OUT extends StreamTaskOUT, OneInputStreamOperatorIN, OUT { + + private static final Logger LOG = LoggerFactory.getLogger(OneInputStreamTask.class); + + protected StreamRecordSerializerIN inSerializer; + protected IndexedReaderIteratorStreamRecordIN recordIterator; + + + @Override + public void registerInputOutput() { + super.registerInputOutput(); + InputHandlerIN inputHandler = new InputHandlerIN(this); + inSerializer = inputHandler.getInputSerializer(); + recordIterator = inputHandler.getInputIter(); + } + + /* +* Reads the next record from the reader iterator and stores it in the +* nextRecord variable +*/ + protected StreamRecordIN readNext() throws IOException { + StreamRecordIN nextRecord = inSerializer.createInstance(); + try { + return recordIterator.next(nextRecord); + } catch (IOException e) { + if (isRunning) { + throw new RuntimeException(Could not read next record due to: + + StringUtils.stringifyException(e)); --- End diff -- Working on it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---