[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475975#comment-15475975 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- [~ggevay] Thanks for the inputs. Let me see that also. I am just trying to make things work. I can make a PR after I feel it is good. May be early next week. This whole week was busy with some personal things so could not complete on time. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475827#comment-15475827 ] ASF GitHub Bot commented on FLINK-4546: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/2454 Hi @fhueske @twalthr , do you have time to have a look again? Any advices are welcome. > Remove STREAM keyword in Stream SQL > > > Key: FLINK-4546 > URL: https://issues.apache.org/jira/browse/FLINK-4546 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM > keyword in Stream SQL. > detailed discuss mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2454: [FLINK-4546] [table] Remove STREAM keyword in Stream SQL
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/2454 Hi @fhueske @twalthr , do you have time to have a look again? Any advices are welcome. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 I have noticed the issue occasionally in master branch too but very inconsistent. I just rebased against the latest master and ran "mvn clean verify". I don't see any errors. I have tried couple of times to see if it reoccurs but it looks good. Not sure if the issue is addressed by some other patches in the master branch? Could you please give it a try? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475694#comment-15475694 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 I have noticed the issue occasionally in master branch too but very inconsistent. I just rebased against the latest master and ran "mvn clean verify". I don't see any errors. I have tried couple of times to see if it reoccurs but it looks good. Not sure if the issue is addressed by some other patches in the master branch? Could you please give it a try? > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3921) StringParser not specifying encoding to use
[ https://issues.apache.org/jira/browse/FLINK-3921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475107#comment-15475107 ] ASF GitHub Bot commented on FLINK-3921: --- Github user rekhajoshm commented on the issue: https://github.com/apache/flink/pull/2060 Hi @greghogan sorry, lost thread on this, updated for Preconditions check now. Unless i am missing something, the enableQuotedStringParsing() is present in subclasses as it is slightly different than the parent and involves if-else-casting to be invoked.But in case of setCharset it is same for all possible subclasses and does not need to be.Hope you agree.Thanks. > StringParser not specifying encoding to use > --- > > Key: FLINK-3921 > URL: https://issues.apache.org/jira/browse/FLINK-3921 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.3 >Reporter: Tatu Saloranta >Assignee: Rekha Joshi >Priority: Trivial > > Class `flink.types.parser.StringParser` has javadocs indicating that contents > are expected to be Ascii, similar to `StringValueParser`. That makes sense, > but when constructing actual instance, no encoding is specified; on line 66 > f.ex: >this.result = new String(bytes, startPos+1, i - startPos - 2); > which leads to using whatever default platform encoding is. If contents > really are always Ascii (would not count on that as parser is used from CSV > reader), not a big deal, but it can lead to the usual Latin-1-VS-UTF-8 issues. > So I think that encoding should be explicitly specified, whatever is to be > used: javadocs claim ascii, so could be "us-ascii", but could well be UTF-8 > or even ISO-8859-1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2060: [FLINK-3921] StringParser encoding
Github user rekhajoshm commented on the issue: https://github.com/apache/flink/pull/2060 Hi @greghogan sorry, lost thread on this, updated for Preconditions check now. Unless i am missing something, the enableQuotedStringParsing() is present in subclasses as it is slightly different than the parent and involves if-else-casting to be invoked.But in case of setCharset it is same for all possible subclasses and does not need to be.Hope you agree.Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4571) Configurable little parallelism in Gelly drivers
[ https://issues.apache.org/jira/browse/FLINK-4571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-4571. - Resolution: Fixed Fix Version/s: (was: 1.2.0) Fixed in bdd3c0d94b2a6cdecb482ee3fdefe082fc1b7c4d > Configurable little parallelism in Gelly drivers > > > Key: FLINK-4571 > URL: https://issues.apache.org/jira/browse/FLINK-4571 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Several Gelly library implementations support a configurable "little > parallelism" which is important when scaling to large data sets. These > algorithms include operators at the beginning and end which process data on > the order of the original DataSet, as well as middle operators that exchange > 100s or 1000s more data. The "little parallelism" should be configurable in > the appropriate Gelly drivers in the flink-gelly-examples module. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4571) Configurable little parallelism in Gelly drivers
[ https://issues.apache.org/jira/browse/FLINK-4571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-4571: -- Fix Version/s: 1.2.0 > Configurable little parallelism in Gelly drivers > > > Key: FLINK-4571 > URL: https://issues.apache.org/jira/browse/FLINK-4571 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Several Gelly library implementations support a configurable "little > parallelism" which is important when scaling to large data sets. These > algorithms include operators at the beginning and end which process data on > the order of the original DataSet, as well as middle operators that exchange > 100s or 1000s more data. The "little parallelism" should be configurable in > the appropriate Gelly drivers in the flink-gelly-examples module. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4257) Handle delegating algorithm change of class
[ https://issues.apache.org/jira/browse/FLINK-4257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-4257: -- Fix Version/s: 1.2.0 > Handle delegating algorithm change of class > --- > > Key: FLINK-4257 > URL: https://issues.apache.org/jira/browse/FLINK-4257 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.2.0 > > > A class created by {{ProxyFactory}} can intercept and reinterpret method > calls using its {{MethodHandler}}, but is restricted in that > * the type of the proxy class cannot be changed > * method return types must be honored > We have algorithms such as {{VertexDegree}} and {{TriangleListing}} that > change return type depending on configuration, even between single and dual > input functions. This can be problematic, e.g. in {{OperatorTranslation}} > where we test {{dataSet instanceof SingleInputOperator}} or {{dataSet > instanceof TwoInputOperator}}. > Even simply changing operator can be problematic, e.g. > {{MapOperator.translateToDataFlow}} returns {{MapOperatorBase}} whereas > {{ReduceOperator.translateToDataFlow}} returns {{SingleInputOperator}}. > I see two ways to solve these issues. By adding a simple {{NoOpOperator}} > that is skipped over during {{OperatorTranslation}} we could wrap all > algorithm output and always be proxying the same class. > Alternatively, making changes only within Gelly we can append a "no-op" > pass-through {{MapFunction}} to any algorithm output which is not a > {{SingleInputOperator}}. And {{Delegate}} can also walk the superclass > hierarchy such we are always proxying {{SingleInputOperator}}. > There is one additional issue. When we call {{DataSet.output}} the delegate's > {{MethodHandler}} must reinterpret this call to add itself to the list of > sinks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4522) Gelly link broken in homepage
[ https://issues.apache.org/jira/browse/FLINK-4522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-4522. - Resolution: Fixed Fixed in 95ad865bd9d7121c80ecd4d1ed92e2912052a502 > Gelly link broken in homepage > - > > Key: FLINK-4522 > URL: https://issues.apache.org/jira/browse/FLINK-4522 > Project: Flink > Issue Type: Bug > Components: Documentation, Gelly >Affects Versions: 1.1.0, 1.1.1 >Reporter: Vasia Kalavri >Assignee: Greg Hogan > Fix For: 1.2.0 > > > The link to the Gelly documentation is broken in the Flink homepage. The link > points to "docs/apis/batch/libs/gelly.md" which has been removed. Since this > link might be present in other places as well, e.g. slides, trainings, etc., > we should re-direct to the new location of the Gelly docs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4257) Handle delegating algorithm change of class
[ https://issues.apache.org/jira/browse/FLINK-4257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan closed FLINK-4257. - Resolution: Fixed Fixed in 8210ff468d64fc50520011fc6fed9909d2a6b89a > Handle delegating algorithm change of class > --- > > Key: FLINK-4257 > URL: https://issues.apache.org/jira/browse/FLINK-4257 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.2.0 > > > A class created by {{ProxyFactory}} can intercept and reinterpret method > calls using its {{MethodHandler}}, but is restricted in that > * the type of the proxy class cannot be changed > * method return types must be honored > We have algorithms such as {{VertexDegree}} and {{TriangleListing}} that > change return type depending on configuration, even between single and dual > input functions. This can be problematic, e.g. in {{OperatorTranslation}} > where we test {{dataSet instanceof SingleInputOperator}} or {{dataSet > instanceof TwoInputOperator}}. > Even simply changing operator can be problematic, e.g. > {{MapOperator.translateToDataFlow}} returns {{MapOperatorBase}} whereas > {{ReduceOperator.translateToDataFlow}} returns {{SingleInputOperator}}. > I see two ways to solve these issues. By adding a simple {{NoOpOperator}} > that is skipped over during {{OperatorTranslation}} we could wrap all > algorithm output and always be proxying the same class. > Alternatively, making changes only within Gelly we can append a "no-op" > pass-through {{MapFunction}} to any algorithm output which is not a > {{SingleInputOperator}}. And {{Delegate}} can also walk the superclass > hierarchy such we are always proxying {{SingleInputOperator}}. > There is one additional issue. When we call {{DataSet.output}} the delegate's > {{MethodHandler}} must reinterpret this call to add itself to the list of > sinks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4522) Gelly link broken in homepage
[ https://issues.apache.org/jira/browse/FLINK-4522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-4522: -- Fix Version/s: 1.2.0 > Gelly link broken in homepage > - > > Key: FLINK-4522 > URL: https://issues.apache.org/jira/browse/FLINK-4522 > Project: Flink > Issue Type: Bug > Components: Documentation, Gelly >Affects Versions: 1.1.0, 1.1.1 >Reporter: Vasia Kalavri >Assignee: Greg Hogan > Fix For: 1.2.0 > > > The link to the Gelly documentation is broken in the Flink homepage. The link > points to "docs/apis/batch/libs/gelly.md" which has been removed. Since this > link might be present in other places as well, e.g. slides, trainings, etc., > we should re-direct to the new location of the Gelly docs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4257) Handle delegating algorithm change of class
[ https://issues.apache.org/jira/browse/FLINK-4257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475031#comment-15475031 ] ASF GitHub Bot commented on FLINK-4257: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2474 > Handle delegating algorithm change of class > --- > > Key: FLINK-4257 > URL: https://issues.apache.org/jira/browse/FLINK-4257 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > A class created by {{ProxyFactory}} can intercept and reinterpret method > calls using its {{MethodHandler}}, but is restricted in that > * the type of the proxy class cannot be changed > * method return types must be honored > We have algorithms such as {{VertexDegree}} and {{TriangleListing}} that > change return type depending on configuration, even between single and dual > input functions. This can be problematic, e.g. in {{OperatorTranslation}} > where we test {{dataSet instanceof SingleInputOperator}} or {{dataSet > instanceof TwoInputOperator}}. > Even simply changing operator can be problematic, e.g. > {{MapOperator.translateToDataFlow}} returns {{MapOperatorBase}} whereas > {{ReduceOperator.translateToDataFlow}} returns {{SingleInputOperator}}. > I see two ways to solve these issues. By adding a simple {{NoOpOperator}} > that is skipped over during {{OperatorTranslation}} we could wrap all > algorithm output and always be proxying the same class. > Alternatively, making changes only within Gelly we can append a "no-op" > pass-through {{MapFunction}} to any algorithm output which is not a > {{SingleInputOperator}}. And {{Delegate}} can also walk the superclass > hierarchy such we are always proxying {{SingleInputOperator}}. > There is one additional issue. When we call {{DataSet.output}} the delegate's > {{MethodHandler}} must reinterpret this call to add itself to the list of > sinks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4571) Configurable little parallelism in Gelly drivers
[ https://issues.apache.org/jira/browse/FLINK-4571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475029#comment-15475029 ] ASF GitHub Bot commented on FLINK-4571: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2475 > Configurable little parallelism in Gelly drivers > > > Key: FLINK-4571 > URL: https://issues.apache.org/jira/browse/FLINK-4571 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Several Gelly library implementations support a configurable "little > parallelism" which is important when scaling to large data sets. These > algorithms include operators at the beginning and end which process data on > the order of the original DataSet, as well as middle operators that exchange > 100s or 1000s more data. The "little parallelism" should be configurable in > the appropriate Gelly drivers in the flink-gelly-examples module. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4522) Gelly link broken in homepage
[ https://issues.apache.org/jira/browse/FLINK-4522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15475030#comment-15475030 ] ASF GitHub Bot commented on FLINK-4522: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2464 > Gelly link broken in homepage > - > > Key: FLINK-4522 > URL: https://issues.apache.org/jira/browse/FLINK-4522 > Project: Flink > Issue Type: Bug > Components: Documentation, Gelly >Affects Versions: 1.1.0, 1.1.1 >Reporter: Vasia Kalavri >Assignee: Greg Hogan > Fix For: 1.2.0 > > > The link to the Gelly documentation is broken in the Flink homepage. The link > points to "docs/apis/batch/libs/gelly.md" which has been removed. Since this > link might be present in other places as well, e.g. slides, trainings, etc., > we should re-direct to the new location of the Gelly docs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2475: [FLINK-4571] [gelly] Configurable little paralleli...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2475 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2474: [FLINK-4257] [gelly] Handle delegating algorithm c...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2474 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2464: [FLINK-4522] [docs] Gelly link broken in homepage
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2464 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474985#comment-15474985 ] ASF GitHub Bot commented on FLINK-3599: --- Github user Xazax-hun commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78087961 --- Diff: flink-core/src/main/resources/PojoComparatorTemplate.ftl --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${packageName}; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NullKeyFieldException; +import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase; +import org.apache.flink.api.java.typeutils.runtime.GenTypeComparatorProxy; +import org.apache.flink.util.InstantiationUtil; + +public final class ${className} extends CompositeTypeComparator implements java.io.Serializable { --- End diff -- Janino ignores generics. If I want generics, need to validate code with java as well to be sure that I get it right, so I just decided to not bother with it for now. > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user Xazax-hun commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78087961 --- Diff: flink-core/src/main/resources/PojoComparatorTemplate.ftl --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${packageName}; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NullKeyFieldException; +import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase; +import org.apache.flink.api.java.typeutils.runtime.GenTypeComparatorProxy; +import org.apache.flink.util.InstantiationUtil; + +public final class ${className} extends CompositeTypeComparator implements java.io.Serializable { --- End diff -- Janino ignores generics. If I want generics, need to validate code with java as well to be sure that I get it right, so I just decided to not bother with it for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474979#comment-15474979 ] ASF GitHub Bot commented on FLINK-3599: --- Github user Xazax-hun commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78087704 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField; + +public final class PojoSerializerGenerator { + private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated"; + + private final Class clazz; + private final Field[] refFields; + private final TypeSerializer[] fieldSerializers; + private final ExecutionConfig config; + private String code; + + public PojoSerializerGenerator( + Class clazz, + TypeSerializer[] fields, + Field[] reflectiveFields, + ExecutionConfig config) { + this.clazz = checkNotNull(clazz); + this.refFields = checkNotNull(reflectiveFields); + this.fieldSerializers = checkNotNull(fields); + this.config = checkNotNull(config); + for (int i = 0; i < this.refFields.length; i++) { + this.refFields[i].setAccessible(true); + } + } + + public TypeSerializer createSerializer() { + final String className = clazz.getCanonicalName().replace('.', '_') + "_GeneratedSerializer"; + final String fullClassName = packageName + "." + className; + Class serializerClazz; + code = InstantiationUtil.getCodeForCachedClass(fullClassName); + if (code == null) { + generateCode(className); + } + if(config.isWrapGeneratedClassesEnabled()) { + return new GenTypeSerializerProxy<>(clazz, fullClassName, code, fieldSerializers, config); + } + try { + serializerClazz = InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code); + } + catch (Exception e) { + throw new RuntimeException("Unable to generate serializer: " + className, e); + } + Constructor[] ctors = serializerClazz.getConstructors(); + assert ctors.length == 1; + try { + return (TypeSerializer) ctors[0].newInstance(new Object[]{clazz, fieldSerializers, config}); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate serializer: " + className, e); + } + + } + + private void generateCode(String className) { + assert fieldSerializers.length > 0; + String typeName = clazz.getCanonicalName(); + StringBuilder members = new StringBuilder(); + for (int i = 0; i < fieldSerializers.length; ++i) { + members.append(String.format("final TypeSerializer f%d;\n", i)); + } + StringBuilder initMembers = new StringBuilder(); + for
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user Xazax-hun commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78087704 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField; + +public final class PojoSerializerGenerator { + private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated"; + + private final Class clazz; + private final Field[] refFields; + private final TypeSerializer[] fieldSerializers; + private final ExecutionConfig config; + private String code; + + public PojoSerializerGenerator( + Class clazz, + TypeSerializer[] fields, + Field[] reflectiveFields, + ExecutionConfig config) { + this.clazz = checkNotNull(clazz); + this.refFields = checkNotNull(reflectiveFields); + this.fieldSerializers = checkNotNull(fields); + this.config = checkNotNull(config); + for (int i = 0; i < this.refFields.length; i++) { + this.refFields[i].setAccessible(true); + } + } + + public TypeSerializer createSerializer() { + final String className = clazz.getCanonicalName().replace('.', '_') + "_GeneratedSerializer"; + final String fullClassName = packageName + "." + className; + Class serializerClazz; + code = InstantiationUtil.getCodeForCachedClass(fullClassName); + if (code == null) { + generateCode(className); + } + if(config.isWrapGeneratedClassesEnabled()) { + return new GenTypeSerializerProxy<>(clazz, fullClassName, code, fieldSerializers, config); + } + try { + serializerClazz = InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code); + } + catch (Exception e) { + throw new RuntimeException("Unable to generate serializer: " + className, e); + } + Constructor[] ctors = serializerClazz.getConstructors(); + assert ctors.length == 1; + try { + return (TypeSerializer) ctors[0].newInstance(new Object[]{clazz, fieldSerializers, config}); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate serializer: " + className, e); + } + + } + + private void generateCode(String className) { + assert fieldSerializers.length > 0; + String typeName = clazz.getCanonicalName(); + StringBuilder members = new StringBuilder(); + for (int i = 0; i < fieldSerializers.length; ++i) { + members.append(String.format("final TypeSerializer f%d;\n", i)); + } + StringBuilder initMembers = new StringBuilder(); + for (int i = 0; i < fieldSerializers.length; ++i) { + initMembers.append(String.format("f%d = serializerFields[%d];\n", i, i)); + } + StringBuilder createFields = new StringBuilder(); +
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474953#comment-15474953 ] ASF GitHub Bot commented on FLINK-3599: --- Github user Xazax-hun commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78086011 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -419,4 +481,61 @@ public String toString() { return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; } } + + public static String accessStringForField(Field f) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + return fieldName; + } + String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(getterName, new Class[0]); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. + return fieldName + "()"; + } + return getterName + "()"; + } + + public static String modifyStringForField(Field f, String arg) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + if (f.getType().isPrimitive()) { + return f.getName() + " = (" + + primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + ")" + arg; --- End diff -- Because arg usually refers to an object. I will add a comment to clarify this. > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user Xazax-hun commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78086011 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -419,4 +481,61 @@ public String toString() { return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; } } + + public static String accessStringForField(Field f) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + return fieldName; + } + String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(getterName, new Class[0]); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. + return fieldName + "()"; + } + return getterName + "()"; + } + + public static String modifyStringForField(Field f, String arg) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + if (f.getType().isPrimitive()) { + return f.getName() + " = (" + + primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + ")" + arg; --- End diff -- Because arg usually refers to an object. I will add a comment to clarify this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474575#comment-15474575 ] Gabor Gevay commented on FLINK-3322: I've just realized that what I wrote above doesn't cover the resetting of the local strategies. For that, we should maybe add a {{ResettableInputProvider}} interface (that extends {{CloseableInputProvider}} (or even just add the reset method directly to {{CloseableInputProvider}})), and make the Sorters implement it, and call this new {{reset}} method in {{resetAllInputs}} (instead of {{close}} and then recreating them). (But this can go to a separate pull request later.) > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474531#comment-15474531 ] Gabor Gevay commented on FLINK-3322: I had an offline chat with [~StephanEwen], and an alternative design came up, which we should also consider: There is the {{ResettableDriver}} interface, which is implemented by those drivers that need to retain some state between iteration steps. The way {{AbstractIterativeTask}} uses this interface, is that it checks if the driver is an instance of this interface, and then doesn't destroy and recreate the driver between iteration steps, but instead just calls {{reset}} on it. We could make every driver implement this interface, and in their {{reset}} method they would just hold on to the memory that they already have. When this is done, it would also allow some simplification by eliminating the special case handling that is distinguishing between resettable and non-resettable operators in lots of different places. Since touching every driver probably can't be avoided anyway, this looks like the cleanest solution. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1526) Add Minimum Spanning Tree library method and example
[ https://issues.apache.org/jira/browse/FLINK-1526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474504#comment-15474504 ] Olga Golovneva commented on FLINK-1526: --- Hi Greg, I didn't found much details about this issue either, here is what I have: http://mail-archives.apache.org/mod_mbox/flink-dev/201504.mbox/%3ccajz2dcugxrxsdxh_rq4dr52ego+out5oz+5n5xm2an5psrx...@mail.gmail.com%3e I suppose, the problem was in data caching within a loop Thnx > Add Minimum Spanning Tree library method and example > > > Key: FLINK-1526 > URL: https://issues.apache.org/jira/browse/FLINK-1526 > Project: Flink > Issue Type: Task > Components: Gelly >Reporter: Vasia Kalavri > > This issue proposes the addition of a library method and an example for > distributed minimum spanning tree in Gelly. > The DMST algorithm is very interesting because it is quite different from > PageRank-like iterative graph algorithms. It consists of distinct phases > inside the same iteration and requires a mechanism to detect convergence of > one phase to proceed to the next one. Current implementations in > vertex-centric models are quite long (>1000 lines) and hard to understand. > You can find a description of the algorithm [here | > http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf] and [here | > http://www.vldb.org/pvldb/vol7/p1047-han.pdf]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4522) Gelly link broken in homepage
[ https://issues.apache.org/jira/browse/FLINK-4522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474418#comment-15474418 ] Greg Hogan commented on FLINK-4522: --- I am only seeing this affect version 1.2. The 1.1 docs have a single page of Gelly documentation. > Gelly link broken in homepage > - > > Key: FLINK-4522 > URL: https://issues.apache.org/jira/browse/FLINK-4522 > Project: Flink > Issue Type: Bug > Components: Documentation, Gelly >Affects Versions: 1.1.0, 1.1.1 >Reporter: Vasia Kalavri >Assignee: Greg Hogan > > The link to the Gelly documentation is broken in the Flink homepage. The link > points to "docs/apis/batch/libs/gelly.md" which has been removed. Since this > link might be present in other places as well, e.g. slides, trainings, etc., > we should re-direct to the new location of the Gelly docs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4585) Fix broken links in index.md
[ https://issues.apache.org/jira/browse/FLINK-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Pivovarov closed FLINK-4585. -- > Fix broken links in index.md > > > Key: FLINK-4585 > URL: https://issues.apache.org/jira/browse/FLINK-4585 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Alexander Pivovarov >Priority: Minor > > The following links are broken > DataSet API > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html > correct link: > https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html > Table API > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html > correct link: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table_api.html > Gelly > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html > correct link: > https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/gelly/index.html > The following links show "Page 'X' Has Moved to" for 1-2 sec and then > redirect to another page > DataStream API > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html > redirects-to: > https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html > programming guide > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html > redirects-to DataSet API: > https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html > probably it should be "Basic API Concepts" > https://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html > or Quick Start - > https://ci.apache.org/projects/flink/flink-docs-master/quickstart/setup_quickstart.html > CEP > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html > redirects-to: > https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html > ML > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/ml/index.html > redirects-to: > https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/ml/index.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4591) Select star does not work with grouping
[ https://issues.apache.org/jira/browse/FLINK-4591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474335#comment-15474335 ] Timo Walther commented on FLINK-4591: - Just because it is not allowed in SQL does not mean that we shouldn't allow it in Table API. But at least the star should be supported in a {{select}} after a {{groupBy}}. > Select star does not work with grouping > --- > > Key: FLINK-4591 > URL: https://issues.apache.org/jira/browse/FLINK-4591 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > > It would be consistent if this would also work: > {{table.groupBy('*).select("*)}} > Currently, the star only works in a plain select without grouping. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4550) Clearly define SQL operator table
[ https://issues.apache.org/jira/browse/FLINK-4550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-4550: Labels: starter (was: ) > Clearly define SQL operator table > - > > Key: FLINK-4550 > URL: https://issues.apache.org/jira/browse/FLINK-4550 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > Labels: starter > > Currently, we use {{SqlStdOperatorTable.instance()}} for setting all > supported operations. However, not all of them are actually supported. > {{FunctionCatalog}} should only return those operators that are tested and > documented. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3656) Rework Table API tests
[ https://issues.apache.org/jira/browse/FLINK-3656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-3656: Labels: starter (was: ) > Rework Table API tests > -- > > Key: FLINK-3656 > URL: https://issues.apache.org/jira/browse/FLINK-3656 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Vasia Kalavri > Labels: starter > > The Table API tests are very inefficient. At the moment It is mostly > end-to-end integration tests, often testing the same functionality several > times (Java/Scala, DataSet/DataStream). > We should look into how we can rework the Table API tests such that: > - long-running integration tests are converted into faster unit tests > - common parts of DataSet and DataStream are only tested once > - common parts of Java and Scala Table APIs are only tested once > - duplicate tests are completely removed -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4592) Fix flaky test ScalarFunctionsTest.testCurrentTimePoint
[ https://issues.apache.org/jira/browse/FLINK-4592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-4592: Labels: starter (was: ) > Fix flaky test ScalarFunctionsTest.testCurrentTimePoint > --- > > Key: FLINK-4592 > URL: https://issues.apache.org/jira/browse/FLINK-4592 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Timo Walther > Labels: starter > > It seems that the test is still non deterministic. > {code} > org.apache.flink.api.table.expressions.ScalarFunctionsTest > testCurrentTimePoint(org.apache.flink.api.table.expressions.ScalarFunctionsTest) > Time elapsed: 0.083 sec <<< FAILURE! > org.junit.ComparisonFailure: Wrong result for: > AS(>=(CHAR_LENGTH(CAST(CURRENT_TIMESTAMP):VARCHAR(1) CHARACTER SET > "ISO-8859-1" COLLATE "ISO-8859-1$en_US$primary" NOT NULL), 22), '_c0') > expected:<[tru]e> but was:<[fals]e> > at org.junit.Assert.assertEquals(Assert.java:115) > at > org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:126) > at > org.apache.flink.api.table.expressions.utils.ExpressionTestBase$$anonfun$evaluateExprs$1.apply(ExpressionTestBase.scala:123) > at > scala.collection.mutable.LinkedHashSet.foreach(LinkedHashSet.scala:87) > at > org.apache.flink.api.table.expressions.utils.ExpressionTestBase.evaluateExprs(ExpressionTestBase.scala:123) > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474327#comment-15474327 ] Timo Walther commented on FLINK-4565: - I'm very sorry. Maybe I underestimated this issue after thinking about it. It is the first combination of expression and table which makes this issue tricky. Yes the validation is very complicated especially because we have 2 validations one for SQL and one for the Table API (with heavy Scala magic). If you are still wanna do that, I can also help you in a private chat. Otherwise e.g. FLINK-4599 would be easier. I will add the "starter" label to easier tasks. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4599) Add 'explain()' also to StreamTableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-4599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-4599: Labels: starter (was: ) > Add 'explain()' also to StreamTableEnvironment > -- > > Key: FLINK-4599 > URL: https://issues.apache.org/jira/browse/FLINK-4599 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther > Labels: starter > > Currenlty, only the BatchTableEnvironment supports the {{explain}} command > for tables. We should also support it for the StreamTableEnvironment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2275 We will have to fix the issue before we can merge this PR. The timeout doesn't occur in non-secure Kafka tests. I think we need to increase the timeout or change the test layout. Could you look into that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474208#comment-15474208 ] ASF GitHub Bot commented on FLINK-3929: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2275 We will have to fix the issue before we can merge this PR. The timeout doesn't occur in non-secure Kafka tests. I think we need to increase the timeout or change the test layout. Could you look into that? > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface
[ https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474159#comment-15474159 ] ASF GitHub Bot commented on FLINK-4456: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2456 > Replace ActorGateway in Task by interface > - > > Key: FLINK-4456 > URL: https://issues.apache.org/jira/browse/FLINK-4456 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{Task}} communicates with the outside world ({{JobManager}} and > {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on > actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} could simply implement these interfaces as part > of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2456 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-4456) Replace ActorGateway in Task by interface
[ https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-4456. Resolution: Fixed Fixed via 0735b5b935b0c0757943e2d58047afcfb9949560 > Replace ActorGateway in Task by interface > - > > Key: FLINK-4456 > URL: https://issues.apache.org/jira/browse/FLINK-4456 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{Task}} communicates with the outside world ({{JobManager}} and > {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on > actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} could simply implement these interfaces as part > of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4458) Remove ForkableFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-4458. Resolution: Fixed Fixed via 02b852e3571e46f25fdfc79f43ceb726ddff9ba7 > Remove ForkableFlinkMiniCluster > --- > > Key: FLINK-4458 > URL: https://issues.apache.org/jira/browse/FLINK-4458 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > After addressing FLINK-4424 we should be able to get rid of the > {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port > in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a > free port, there should no longer be conflicting port requests. Consequently, > the {{ForkableFlinkMiniCluster}} will become obsolete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474137#comment-15474137 ] ASF GitHub Bot commented on FLINK-4458: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2450 > Remove ForkableFlinkMiniCluster > --- > > Key: FLINK-4458 > URL: https://issues.apache.org/jira/browse/FLINK-4458 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > After addressing FLINK-4424 we should be able to get rid of the > {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port > in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a > free port, there should no longer be conflicting port requests. Consequently, > the {{ForkableFlinkMiniCluster}} will become obsolete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by L...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2450 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474082#comment-15474082 ] Simone Robutti commented on FLINK-4565: --- After a couple of days of work I believe this task is a bit too complex for me. I'm struggling with the complexity of constructing the RelNode and doing the validation. Missing an example similar enough to take inspiration, it's hard to work on this by myself. I can imagine that this would take no more than a couple of hours to someone with experience with Flink and Calcite and despite all the stuff I've learnt in these days, I'm seeing no progress. My biggest problem is that I cannot really wrap my head around many of the exception it throws when performing construction and validation. If there's someone patient enough to go through what I've done right now, I can open a WIP PR or link to my repo, pointing to all the stuff that gets me confused. Otherwise I can leave this issue to someone more skilled than me and retry with something easier and more straightforward. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4572) Convert to negative in LongValueToIntValue
[ https://issues.apache.org/jira/browse/FLINK-4572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474038#comment-15474038 ] ASF GitHub Bot commented on FLINK-4572: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2469 I split the long to int translator into both signed and unsigned translators so the conversion would not be ambiguous. The test will fail without the fix in FLINK-4594. > Convert to negative in LongValueToIntValue > -- > > Key: FLINK-4572 > URL: https://issues.apache.org/jira/browse/FLINK-4572 > Project: Flink > Issue Type: Bug > Components: Gelly >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > The Gelly drivers expect that scale 32 edges, represented by the lower 32 > bits of {{long}} values, can be converted to {{int}} values. Values between > 2^31 and 2^32 - 1 should be converted to negative integers, which is not > supported by {{MathUtils.checkedDownCast}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2469: [FLINK-4572] [gelly] Convert to negative in LongValueToIn...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2469 I split the long to int translator into both signed and unsigned translators so the conversion would not be ambiguous. The test will fail without the fix in FLINK-4594. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4594) Validate lower bound in MathUtils.checkedDownCast
[ https://issues.apache.org/jira/browse/FLINK-4594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15474029#comment-15474029 ] ASF GitHub Bot commented on FLINK-4594: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2481 [FLINK-4594] [core] Validate lower bound in MathUtils.checkedDownCast You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4594_validate_lower_bound_in_mathutils_checkeddowncast Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2481.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2481 commit d273919155cfef48d807b7f93430b52944e145f4 Author: Greg HoganDate: 2016-09-08T14:35:39Z [FLINK-4594] [core] Validate lower bound in MathUtils.checkedDownCast > Validate lower bound in MathUtils.checkedDownCast > - > > Key: FLINK-4594 > URL: https://issues.apache.org/jira/browse/FLINK-4594 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > > {{MathUtils.checkedDownCast}} only compares against the upper bound > {{Integer.MAX_VALUE}}, which has worked with current usage. > Rather than adding a second comparison we can replace > {noformat} > if (value > Integer.MAX_VALUE) { > {noformat} > with a cast and check > {noformat} > if ((int)value != value) { ... > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2481: [FLINK-4594] [core] Validate lower bound in MathUt...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2481 [FLINK-4594] [core] Validate lower bound in MathUtils.checkedDownCast You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4594_validate_lower_bound_in_mathutils_checkeddowncast Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2481.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2481 commit d273919155cfef48d807b7f93430b52944e145f4 Author: Greg HoganDate: 2016-09-08T14:35:39Z [FLINK-4594] [core] Validate lower bound in MathUtils.checkedDownCast --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473966#comment-15473966 ] Timo Walther commented on FLINK-4565: - Yes, this would fail then. But I think there is no other good solution if you specify expressions as String as we do in the Java API. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473867#comment-15473867 ] Simone Robutti commented on FLINK-4565: --- I'm no expert in the Table API, but if a user doesn't explicitely register a table in the table register, will this fail? > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78003840 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField; + +public final class PojoSerializerGenerator { + private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated"; + + private final Class clazz; + private final Field[] refFields; + private final TypeSerializer[] fieldSerializers; + private final ExecutionConfig config; + private String code; + + public PojoSerializerGenerator( + Class clazz, + TypeSerializer[] fields, + Field[] reflectiveFields, + ExecutionConfig config) { + this.clazz = checkNotNull(clazz); + this.refFields = checkNotNull(reflectiveFields); + this.fieldSerializers = checkNotNull(fields); + this.config = checkNotNull(config); + for (int i = 0; i < this.refFields.length; i++) { + this.refFields[i].setAccessible(true); + } + } + + public TypeSerializer createSerializer() { + final String className = clazz.getCanonicalName().replace('.', '_') + "_GeneratedSerializer"; + final String fullClassName = packageName + "." + className; + Class serializerClazz; + code = InstantiationUtil.getCodeForCachedClass(fullClassName); + if (code == null) { + generateCode(className); + } + if(config.isWrapGeneratedClassesEnabled()) { + return new GenTypeSerializerProxy<>(clazz, fullClassName, code, fieldSerializers, config); + } + try { + serializerClazz = InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code); + } + catch (Exception e) { + throw new RuntimeException("Unable to generate serializer: " + className, e); + } + Constructor[] ctors = serializerClazz.getConstructors(); + assert ctors.length == 1; + try { + return (TypeSerializer) ctors[0].newInstance(new Object[]{clazz, fieldSerializers, config}); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate serializer: " + className, e); + } + + } + + private void generateCode(String className) { + assert fieldSerializers.length > 0; + String typeName = clazz.getCanonicalName(); + StringBuilder members = new StringBuilder(); + for (int i = 0; i < fieldSerializers.length; ++i) { + members.append(String.format("final TypeSerializer f%d;\n", i)); + } + StringBuilder initMembers = new StringBuilder(); + for (int i = 0; i < fieldSerializers.length; ++i) { + initMembers.append(String.format("f%d = serializerFields[%d];\n", i, i)); + } + StringBuilder createFields = new StringBuilder(); +
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473854#comment-15473854 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78003840 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField; + +public final class PojoSerializerGenerator { + private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated"; + + private final Class clazz; + private final Field[] refFields; + private final TypeSerializer[] fieldSerializers; + private final ExecutionConfig config; + private String code; + + public PojoSerializerGenerator( + Class clazz, + TypeSerializer[] fields, + Field[] reflectiveFields, + ExecutionConfig config) { + this.clazz = checkNotNull(clazz); + this.refFields = checkNotNull(reflectiveFields); + this.fieldSerializers = checkNotNull(fields); + this.config = checkNotNull(config); + for (int i = 0; i < this.refFields.length; i++) { + this.refFields[i].setAccessible(true); + } + } + + public TypeSerializer createSerializer() { + final String className = clazz.getCanonicalName().replace('.', '_') + "_GeneratedSerializer"; + final String fullClassName = packageName + "." + className; + Class serializerClazz; + code = InstantiationUtil.getCodeForCachedClass(fullClassName); + if (code == null) { + generateCode(className); + } + if(config.isWrapGeneratedClassesEnabled()) { + return new GenTypeSerializerProxy<>(clazz, fullClassName, code, fieldSerializers, config); + } + try { + serializerClazz = InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code); + } + catch (Exception e) { + throw new RuntimeException("Unable to generate serializer: " + className, e); + } + Constructor[] ctors = serializerClazz.getConstructors(); + assert ctors.length == 1; + try { + return (TypeSerializer) ctors[0].newInstance(new Object[]{clazz, fieldSerializers, config}); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate serializer: " + className, e); + } + + } + + private void generateCode(String className) { + assert fieldSerializers.length > 0; + String typeName = clazz.getCanonicalName(); + StringBuilder members = new StringBuilder(); + for (int i = 0; i < fieldSerializers.length; ++i) { + members.append(String.format("final TypeSerializer f%d;\n", i)); + } + StringBuilder initMembers = new StringBuilder(); + for (int i
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473852#comment-15473852 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78003691 --- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl --- @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${packageName}; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public final class ${className} extends TypeSerializer { + private static byte IS_NULL = 1; + private static byte NO_SUBCLASS = 2; + private static byte IS_SUBCLASS = 4; + private static byte IS_TAGGED_SUBCLASS = 8; + private int numFields; + private ExecutionConfig executionConfig; + private MapsubclassSerializerCache; + private final Map registeredClasses; + private final TypeSerializer[] registeredSerializers; + Class clazz; + <#list members as m> + ${m} + + public ${className}(Class clazz, TypeSerializer[] serializerFields, ExecutionConfig e) { + this.clazz = clazz; + executionConfig = e; + this.numFields = serializerFields.length; + LinkedHashSet registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); + subclassSerializerCache = new HashMap (); + List cleanedTaggedClasses = new ArrayList(registeredPojoTypes.size()); + for (Class registeredClass: registeredPojoTypes) { + if (registeredClass.equals(clazz)) { + continue; + } + if (!clazz.isAssignableFrom(registeredClass)) { + continue; + } + cleanedTaggedClasses.add(registeredClass); + } + this.registeredClasses = new LinkedHashMap (cleanedTaggedClasses.size()); + registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()]; + int id = 0; + for (Class registeredClass: cleanedTaggedClasses) { + this.registeredClasses.put(registeredClass, id); + TypeInformation typeInfo = TypeExtractor.createTypeInfo(registeredClass); + registeredSerializers[id] = typeInfo.createSerializer(executionConfig); + id++; + } + <#list initMembers as m> + ${m} + + } + private TypeSerializer getSubclassSerializer(Class subclass) { + TypeSerializer result = (TypeSerializer)subclassSerializerCache.get(subclass); + if (result == null) { + TypeInformation typeInfo = TypeExtractor.createTypeInfo(subclass); + result = typeInfo.createSerializer(executionConfig); + subclassSerializerCache.put(subclass, result); + } + return result; + } + public boolean isImmutableType() { return false; } + public ${className} duplicate() { + boolean
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78003691 --- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl --- @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${packageName}; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public final class ${className} extends TypeSerializer { + private static byte IS_NULL = 1; + private static byte NO_SUBCLASS = 2; + private static byte IS_SUBCLASS = 4; + private static byte IS_TAGGED_SUBCLASS = 8; + private int numFields; + private ExecutionConfig executionConfig; + private MapsubclassSerializerCache; + private final Map registeredClasses; + private final TypeSerializer[] registeredSerializers; + Class clazz; + <#list members as m> + ${m} + + public ${className}(Class clazz, TypeSerializer[] serializerFields, ExecutionConfig e) { + this.clazz = clazz; + executionConfig = e; + this.numFields = serializerFields.length; + LinkedHashSet registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); + subclassSerializerCache = new HashMap (); + List cleanedTaggedClasses = new ArrayList(registeredPojoTypes.size()); + for (Class registeredClass: registeredPojoTypes) { + if (registeredClass.equals(clazz)) { + continue; + } + if (!clazz.isAssignableFrom(registeredClass)) { + continue; + } + cleanedTaggedClasses.add(registeredClass); + } + this.registeredClasses = new LinkedHashMap (cleanedTaggedClasses.size()); + registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()]; + int id = 0; + for (Class registeredClass: cleanedTaggedClasses) { + this.registeredClasses.put(registeredClass, id); + TypeInformation typeInfo = TypeExtractor.createTypeInfo(registeredClass); + registeredSerializers[id] = typeInfo.createSerializer(executionConfig); + id++; + } + <#list initMembers as m> + ${m} + + } + private TypeSerializer getSubclassSerializer(Class subclass) { + TypeSerializer result = (TypeSerializer)subclassSerializerCache.get(subclass); + if (result == null) { + TypeInformation typeInfo = TypeExtractor.createTypeInfo(subclass); + result = typeInfo.createSerializer(executionConfig); + subclassSerializerCache.put(subclass, result); + } + return result; + } + public boolean isImmutableType() { return false; } + public ${className} duplicate() { + boolean stateful = false; + TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[numFields]; + <#list duplicateSerializers as ds> + ${ds} + + if (stateful) { +
[jira] [Commented] (FLINK-1526) Add Minimum Spanning Tree library method and example
[ https://issues.apache.org/jira/browse/FLINK-1526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473850#comment-15473850 ] Greg Hogan commented on FLINK-1526: --- Hi Olga, do you have a reference for the for-loop iteration issue as it pertains to this algorithm? There wasn't much discussion on this ticket or associated pull request. Is the intent to use iterations or a loop with data caching? > Add Minimum Spanning Tree library method and example > > > Key: FLINK-1526 > URL: https://issues.apache.org/jira/browse/FLINK-1526 > Project: Flink > Issue Type: Task > Components: Gelly >Reporter: Vasia Kalavri > > This issue proposes the addition of a library method and an example for > distributed minimum spanning tree in Gelly. > The DMST algorithm is very interesting because it is quite different from > PageRank-like iterative graph algorithms. It consists of distinct phases > inside the same iteration and requires a mechanism to detect convergence of > one phase to proceed to the next one. Current implementations in > vertex-centric models are quite long (>1000 lines) and hard to understand. > You can find a description of the algorithm [here | > http://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf] and [here | > http://www.vldb.org/pvldb/vol7/p1047-han.pdf]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473836#comment-15473836 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78002589 --- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl --- @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${packageName}; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public final class ${className} extends TypeSerializer { + private static byte IS_NULL = 1; + private static byte NO_SUBCLASS = 2; + private static byte IS_SUBCLASS = 4; + private static byte IS_TAGGED_SUBCLASS = 8; + private int numFields; + private ExecutionConfig executionConfig; + private MapsubclassSerializerCache; + private final Map registeredClasses; + private final TypeSerializer[] registeredSerializers; + Class clazz; + <#list members as m> + ${m} + + public ${className}(Class clazz, TypeSerializer[] serializerFields, ExecutionConfig e) { + this.clazz = clazz; + executionConfig = e; + this.numFields = serializerFields.length; + LinkedHashSet registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); + subclassSerializerCache = new HashMap (); + List cleanedTaggedClasses = new ArrayList(registeredPojoTypes.size()); + for (Class registeredClass: registeredPojoTypes) { + if (registeredClass.equals(clazz)) { + continue; + } + if (!clazz.isAssignableFrom(registeredClass)) { + continue; + } + cleanedTaggedClasses.add(registeredClass); + } + this.registeredClasses = new LinkedHashMap (cleanedTaggedClasses.size()); + registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()]; + int id = 0; + for (Class registeredClass: cleanedTaggedClasses) { + this.registeredClasses.put(registeredClass, id); + TypeInformation typeInfo = TypeExtractor.createTypeInfo(registeredClass); + registeredSerializers[id] = typeInfo.createSerializer(executionConfig); + id++; + } + <#list initMembers as m> + ${m} + + } + private TypeSerializer getSubclassSerializer(Class subclass) { + TypeSerializer result = (TypeSerializer)subclassSerializerCache.get(subclass); + if (result == null) { + TypeInformation typeInfo = TypeExtractor.createTypeInfo(subclass); + result = typeInfo.createSerializer(executionConfig); + subclassSerializerCache.put(subclass, result); + } + return result; + } + public boolean isImmutableType() { return false; } + public ${className} duplicate() { + boolean
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78002589 --- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl --- @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${packageName}; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public final class ${className} extends TypeSerializer { + private static byte IS_NULL = 1; + private static byte NO_SUBCLASS = 2; + private static byte IS_SUBCLASS = 4; + private static byte IS_TAGGED_SUBCLASS = 8; + private int numFields; + private ExecutionConfig executionConfig; + private MapsubclassSerializerCache; + private final Map registeredClasses; + private final TypeSerializer[] registeredSerializers; + Class clazz; + <#list members as m> + ${m} + + public ${className}(Class clazz, TypeSerializer[] serializerFields, ExecutionConfig e) { + this.clazz = clazz; + executionConfig = e; + this.numFields = serializerFields.length; + LinkedHashSet registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); + subclassSerializerCache = new HashMap (); + List cleanedTaggedClasses = new ArrayList(registeredPojoTypes.size()); + for (Class registeredClass: registeredPojoTypes) { + if (registeredClass.equals(clazz)) { + continue; + } + if (!clazz.isAssignableFrom(registeredClass)) { + continue; + } + cleanedTaggedClasses.add(registeredClass); + } + this.registeredClasses = new LinkedHashMap (cleanedTaggedClasses.size()); + registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()]; + int id = 0; + for (Class registeredClass: cleanedTaggedClasses) { + this.registeredClasses.put(registeredClass, id); + TypeInformation typeInfo = TypeExtractor.createTypeInfo(registeredClass); + registeredSerializers[id] = typeInfo.createSerializer(executionConfig); + id++; + } + <#list initMembers as m> + ${m} + + } + private TypeSerializer getSubclassSerializer(Class subclass) { + TypeSerializer result = (TypeSerializer)subclassSerializerCache.get(subclass); + if (result == null) { + TypeInformation typeInfo = TypeExtractor.createTypeInfo(subclass); + result = typeInfo.createSerializer(executionConfig); + subclassSerializerCache.put(subclass, result); + } + return result; + } + public boolean isImmutableType() { return false; } + public ${className} duplicate() { + boolean stateful = false; + TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[numFields]; + <#list duplicateSerializers as ds> + ${ds} + + if (stateful) { +
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473833#comment-15473833 ] Timo Walther commented on FLINK-4565: - I think it is ok, if you just parse the table name and introduce a `UnresolvedTable` expression which can later be resolved/looked up in the table registry. > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473826#comment-15473826 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78001809 --- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl --- @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${packageName}; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public final class ${className} extends TypeSerializer { + private static byte IS_NULL = 1; + private static byte NO_SUBCLASS = 2; + private static byte IS_SUBCLASS = 4; + private static byte IS_TAGGED_SUBCLASS = 8; + private int numFields; + private ExecutionConfig executionConfig; + private MapsubclassSerializerCache; + private final Map registeredClasses; + private final TypeSerializer[] registeredSerializers; + Class clazz; + <#list members as m> + ${m} + + public ${className}(Class clazz, TypeSerializer[] serializerFields, ExecutionConfig e) { + this.clazz = clazz; + executionConfig = e; + this.numFields = serializerFields.length; + LinkedHashSet registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); + subclassSerializerCache = new HashMap (); + List cleanedTaggedClasses = new ArrayList(registeredPojoTypes.size()); + for (Class registeredClass: registeredPojoTypes) { + if (registeredClass.equals(clazz)) { + continue; + } + if (!clazz.isAssignableFrom(registeredClass)) { + continue; + } + cleanedTaggedClasses.add(registeredClass); + } + this.registeredClasses = new LinkedHashMap (cleanedTaggedClasses.size()); + registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()]; + int id = 0; + for (Class registeredClass: cleanedTaggedClasses) { + this.registeredClasses.put(registeredClass, id); + TypeInformation typeInfo = TypeExtractor.createTypeInfo(registeredClass); + registeredSerializers[id] = typeInfo.createSerializer(executionConfig); + id++; + } + <#list initMembers as m> + ${m} + + } + private TypeSerializer getSubclassSerializer(Class subclass) { + TypeSerializer result = (TypeSerializer)subclassSerializerCache.get(subclass); + if (result == null) { + TypeInformation typeInfo = TypeExtractor.createTypeInfo(subclass); + result = typeInfo.createSerializer(executionConfig); + subclassSerializerCache.put(subclass, result); + } + return result; + } + public boolean isImmutableType() { return false; } + public ${className} duplicate() { + boolean
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78001809 --- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl --- @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${packageName}; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public final class ${className} extends TypeSerializer { + private static byte IS_NULL = 1; + private static byte NO_SUBCLASS = 2; + private static byte IS_SUBCLASS = 4; + private static byte IS_TAGGED_SUBCLASS = 8; + private int numFields; + private ExecutionConfig executionConfig; + private MapsubclassSerializerCache; + private final Map registeredClasses; + private final TypeSerializer[] registeredSerializers; + Class clazz; + <#list members as m> + ${m} + + public ${className}(Class clazz, TypeSerializer[] serializerFields, ExecutionConfig e) { + this.clazz = clazz; + executionConfig = e; + this.numFields = serializerFields.length; + LinkedHashSet registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); + subclassSerializerCache = new HashMap (); + List cleanedTaggedClasses = new ArrayList(registeredPojoTypes.size()); + for (Class registeredClass: registeredPojoTypes) { + if (registeredClass.equals(clazz)) { + continue; + } + if (!clazz.isAssignableFrom(registeredClass)) { + continue; + } + cleanedTaggedClasses.add(registeredClass); + } + this.registeredClasses = new LinkedHashMap (cleanedTaggedClasses.size()); + registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()]; + int id = 0; + for (Class registeredClass: cleanedTaggedClasses) { + this.registeredClasses.put(registeredClass, id); + TypeInformation typeInfo = TypeExtractor.createTypeInfo(registeredClass); + registeredSerializers[id] = typeInfo.createSerializer(executionConfig); + id++; + } + <#list initMembers as m> + ${m} + + } + private TypeSerializer getSubclassSerializer(Class subclass) { + TypeSerializer result = (TypeSerializer)subclassSerializerCache.get(subclass); + if (result == null) { + TypeInformation typeInfo = TypeExtractor.createTypeInfo(subclass); + result = typeInfo.createSerializer(executionConfig); + subclassSerializerCache.put(subclass, result); + } + return result; + } + public boolean isImmutableType() { return false; } + public ${className} duplicate() { + boolean stateful = false; + TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[numFields]; + <#list duplicateSerializers as ds> + ${ds} + + if (stateful) { +
[jira] [Assigned] (FLINK-4579) Add StateBackendFactory for RocksDB Backend
[ https://issues.apache.org/jira/browse/FLINK-4579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-4579: -- Assignee: Jark Wu > Add StateBackendFactory for RocksDB Backend > --- > > Key: FLINK-4579 > URL: https://issues.apache.org/jira/browse/FLINK-4579 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Jark Wu > > Right now, we only have a {{StateBackendFactory}} for the {{FsStateBackend}} > which means that users cannot specify to use the RocksDB backend in the flink > configuration. > If we add a factory for rocksdb we should also think about adding the rocksdb > backend to the standard distribution lib, otherwise it is only usable if > users manually place the rocks jars in the Flink lib folder. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473806#comment-15473806 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78000250 --- Diff: flink-core/src/main/resources/PojoComparatorTemplate.ftl --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${packageName}; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NullKeyFieldException; +import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase; +import org.apache.flink.api.java.typeutils.runtime.GenTypeComparatorProxy; +import org.apache.flink.util.InstantiationUtil; + +public final class ${className} extends CompositeTypeComparator implements java.io.Serializable { --- End diff -- Maybe add `<${className}>` as generic argument to `CompositeTypeComparator` > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78000250 --- Diff: flink-core/src/main/resources/PojoComparatorTemplate.ftl --- @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ${packageName}; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NullKeyFieldException; +import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase; +import org.apache.flink.api.java.typeutils.runtime.GenTypeComparatorProxy; +import org.apache.flink.util.InstantiationUtil; + +public final class ${className} extends CompositeTypeComparator implements java.io.Serializable { --- End diff -- Maybe add `<${className}>` as generic argument to `CompositeTypeComparator` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473799#comment-15473799 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77999725 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField; + +public final class PojoComparatorGenerator { + private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated"; + + private transient Field[] keyFields; + private transient Integer[] keyFieldIds; + private final TypeComparator[] comparators; + private final TypeSerializer serializer; + private final Class type; + private final ExecutionConfig config; + private String code; + + public PojoComparatorGenerator(Field[] keyFields, TypeComparator[] comparators, TypeSerializer serializer, + Class type, Integer[] keyFieldIds, ExecutionConfig config) { + this.keyFields = keyFields; + this.comparators = (TypeComparator[]) comparators; + + this.type = type; + this.serializer = serializer; + this.keyFieldIds = keyFieldIds; + this.config = config; + } + + public TypeComparator createComparator() { + // Multiple comparators can be generated for each type based on a list of keys. The list of keys and the type + // name should determine the generated comparator. This information is used for caching (avoiding + // recompilation). Note that, the name of the field is not sufficient because nested POJOs might have a field + // with the name. + StringBuilder keyBuilder = new StringBuilder(); + for(Integer i : keyFieldIds) { + keyBuilder.append(i); + keyBuilder.append("_"); + } + final String className = type.getCanonicalName().replace('.', '_') + "_GeneratedComparator" + + keyBuilder.toString(); + final String fullClassName = packageName + "." + className; + Class comparatorClazz; + code = InstantiationUtil.getCodeForCachedClass(fullClassName); + if (code == null) { + generateCode(className); + } + if (config.isWrapGeneratedClassesEnabled()) { + return new GenTypeComparatorProxy<>(type, fullClassName, code, comparators, serializer); + } + try { + comparatorClazz = InstantiationUtil.compile(type.getClassLoader(), fullClassName, code); + } catch (Exception e) { + throw new RuntimeException("Unable to generate comparator: " + className, e); + } + Constructor[] ctors = comparatorClazz.getConstructors(); + assert ctors.length == 1; + try { + return (TypeComparator) ctors[0].newInstance(new Object[]{comparators, serializer, type}); + } catch (Exception e) { + throw new RuntimeException("Unable to
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77999725 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField; + +public final class PojoComparatorGenerator { + private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated"; + + private transient Field[] keyFields; + private transient Integer[] keyFieldIds; + private final TypeComparator[] comparators; + private final TypeSerializer serializer; + private final Class type; + private final ExecutionConfig config; + private String code; + + public PojoComparatorGenerator(Field[] keyFields, TypeComparator[] comparators, TypeSerializer serializer, + Class type, Integer[] keyFieldIds, ExecutionConfig config) { + this.keyFields = keyFields; + this.comparators = (TypeComparator[]) comparators; + + this.type = type; + this.serializer = serializer; + this.keyFieldIds = keyFieldIds; + this.config = config; + } + + public TypeComparator createComparator() { + // Multiple comparators can be generated for each type based on a list of keys. The list of keys and the type + // name should determine the generated comparator. This information is used for caching (avoiding + // recompilation). Note that, the name of the field is not sufficient because nested POJOs might have a field + // with the name. + StringBuilder keyBuilder = new StringBuilder(); + for(Integer i : keyFieldIds) { + keyBuilder.append(i); + keyBuilder.append("_"); + } + final String className = type.getCanonicalName().replace('.', '_') + "_GeneratedComparator" + + keyBuilder.toString(); + final String fullClassName = packageName + "." + className; + Class comparatorClazz; + code = InstantiationUtil.getCodeForCachedClass(fullClassName); + if (code == null) { + generateCode(className); + } + if (config.isWrapGeneratedClassesEnabled()) { + return new GenTypeComparatorProxy<>(type, fullClassName, code, comparators, serializer); + } + try { + comparatorClazz = InstantiationUtil.compile(type.getClassLoader(), fullClassName, code); + } catch (Exception e) { + throw new RuntimeException("Unable to generate comparator: " + className, e); + } + Constructor[] ctors = comparatorClazz.getConstructors(); + assert ctors.length == 1; + try { + return (TypeComparator) ctors[0].newInstance(new Object[]{comparators, serializer, type}); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate comparator using: " + ctors[0].getName(), e); + } + } + + + private void generateCode(String className) { + String typeName = type.getCanonicalName(); + StringBuilder
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473775#comment-15473775 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77997800 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -273,7 +276,7 @@ public void releaseOutputs() { // now create the operator and give it the output collector to write its output to OneInputStreamOperatorchainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); - chainedOperator.setup(containingTask, operatorConfig, output); + chainedOperator.setup(containingTask, operatorConfig, output, streamOutputs.size() == 0); --- End diff -- could we not set this value in the `operatorConfig` instead? > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77997800 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -273,7 +276,7 @@ public void releaseOutputs() { // now create the operator and give it the output collector to write its output to OneInputStreamOperatorchainedOperator = operatorConfig.getStreamOperator(userCodeClassloader); - chainedOperator.setup(containingTask, operatorConfig, output); + chainedOperator.setup(containingTask, operatorConfig, output, streamOutputs.size() == 0); --- End diff -- could we not set this value in the `operatorConfig` instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473764#comment-15473764 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77997086 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java --- @@ -83,6 +83,7 @@ public void processWatermark2(Watermark mark) throws Exception { } } + --- End diff -- unnecessary new line > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77997086 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java --- @@ -83,6 +83,7 @@ public void processWatermark2(Watermark mark) throws Exception { } } + --- End diff -- unnecessary new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77997021 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -347,4 +521,5 @@ public void close() { output.close(); } } + --- End diff -- unnecessary new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473762#comment-15473762 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77997021 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -347,4 +521,5 @@ public void close() { output.close(); } } + --- End diff -- unnecessary new line > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473760#comment-15473760 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996891 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -94,20 +102,38 @@ /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ private transient KeyedStateBackend keyedStateBackend; - protected transient MetricGroup metrics; + // --- Metrics --- + + /** Metric group for the operator */ + protected MetricGroup metrics; + + /** Flag indicating if this operator is a sink */ + protected transient boolean isSink = false; + + protected LatencyGauge latencyGauge; + // // Life Cycle // @Override - public void setup(StreamTask containingTask, StreamConfig config, Outputoutput) { + public void setup(StreamTask containingTask, StreamConfig config, Output output, boolean isSink) { this.container = containingTask; this.config = config; String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + this.isSink = isSink; + Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); + int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE); + if(historySize <= 0) { + LOG.warn("{} has been set to a value below 0: {}. Using default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize); + historySize = ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE; + } + + latencyGauge = new LatencyGauge(this.metrics, historySize, !isSink); --- End diff -- this looks a bit odd; `latencyGauge = this.metrics.gauge(new LatencyGauge(historySize, isSink), "latency")` would be more consistent to how other metrics are registered. > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996891 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -94,20 +102,38 @@ /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ private transient KeyedStateBackend keyedStateBackend; - protected transient MetricGroup metrics; + // --- Metrics --- + + /** Metric group for the operator */ + protected MetricGroup metrics; + + /** Flag indicating if this operator is a sink */ + protected transient boolean isSink = false; + + protected LatencyGauge latencyGauge; + // // Life Cycle // @Override - public void setup(StreamTask containingTask, StreamConfig config, Outputoutput) { + public void setup(StreamTask containingTask, StreamConfig config, Output output, boolean isSink) { this.container = containingTask; this.config = config; String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + this.isSink = isSink; + Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); + int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE); + if(historySize <= 0) { + LOG.warn("{} has been set to a value below 0: {}. Using default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize); + historySize = ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE; + } + + latencyGauge = new LatencyGauge(this.metrics, historySize, !isSink); --- End diff -- this looks a bit odd; `latencyGauge = this.metrics.gauge(new LatencyGauge(historySize, isSink), "latency")` would be more consistent to how other metrics are registered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473754#comment-15473754 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996545 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -94,20 +102,38 @@ /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ private transient KeyedStateBackend keyedStateBackend; - protected transient MetricGroup metrics; + // --- Metrics --- + + /** Metric group for the operator */ + protected MetricGroup metrics; + + /** Flag indicating if this operator is a sink */ + protected transient boolean isSink = false; + + protected LatencyGauge latencyGauge; + // // Life Cycle // @Override - public void setup(StreamTask containingTask, StreamConfig config, Outputoutput) { + public void setup(StreamTask containingTask, StreamConfig config, Output output, boolean isSink) { this.container = containingTask; this.config = config; String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + this.isSink = isSink; + Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); + int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE); + if(historySize <= 0) { --- End diff -- missing space after if > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996545 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -94,20 +102,38 @@ /** Backend for keyed state. This might be empty if we're not on a keyed stream. */ private transient KeyedStateBackend keyedStateBackend; - protected transient MetricGroup metrics; + // --- Metrics --- + + /** Metric group for the operator */ + protected MetricGroup metrics; + + /** Flag indicating if this operator is a sink */ + protected transient boolean isSink = false; + + protected LatencyGauge latencyGauge; + // // Life Cycle // @Override - public void setup(StreamTask containingTask, StreamConfig config, Outputoutput) { + public void setup(StreamTask containingTask, StreamConfig config, Output output, boolean isSink) { this.container = containingTask; this.config = config; String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim(); this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName); this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut")); + this.isSink = isSink; + Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); + int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE); + if(historySize <= 0) { --- End diff -- missing space after if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473751#comment-15473751 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996378 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java --- @@ -149,6 +154,8 @@ private LinkedHashSetregisteredPojoTypes = new LinkedHashSet<>(); + --- End diff -- new lines should be removed > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996378 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java --- @@ -149,6 +154,8 @@ private LinkedHashSetregisteredPojoTypes = new LinkedHashSet<>(); + --- End diff -- new lines should be removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473745#comment-15473745 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996152 --- Diff: docs/monitoring/metrics.md --- @@ -475,52 +474,76 @@ Flink exposes the following system metrics: - -Task -currentLowWatermark -The lowest watermark a task has received. - - -lastCheckpointDuration -The time it took to complete the last checkpoint. - - -lastCheckpointSize -The total size of the last checkpoint. - - -restartingTime -The time it took to restart the job. - - -numBytesInLocal -The total number of bytes this task has read from a local source. - - -numBytesInRemote -The total number of bytes this task has read from a remote source. - - -numBytesOut -The total number of bytes this task has emitted. - - - - -Operator -numRecordsIn -The total number of records this operator has received. - - -numRecordsOut -The total number of records this operator has emitted. - - -numSplitsProcessed -The total number of InputSplits this data source has processed. - + Task + currentLowWatermark + The lowest watermark a task has received. + + + lastCheckpointDuration + The time it took to complete the last checkpoint. + + + lastCheckpointSize + The total size of the last checkpoint. + + + restartingTime + The time it took to restart the job. + + + numBytesInLocal + The total number of bytes this task has read from a local source. + + + numBytesInRemote + The total number of bytes this task has read from a remote source. + + + numBytesOut + The total number of bytes this task has emitted. + + + Operator + numRecordsIn + The total number of records this operator has received. + + + numRecordsOut + The total number of records this operator has emitted. + + + numSplitsProcessed + The total number of InputSplits this data source has processed (if the operator is a data source). + + + latency + A latency gauge reporting the latency distribution from the different sources. + +### Latency tracking + +Flink allows to track the latency of records traveling through the system. To enable the latency tracking +a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`. + +At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMaker`. --- End diff -- LatencyMaker -> LatencyMarker > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. >
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473744#comment-15473744 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77996133 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField; + +public final class PojoSerializerGenerator { --- End diff -- Please add javadoc and audience/stability annotation (probably Internal). > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473747#comment-15473747 ] ASF GitHub Bot commented on FLINK-3660: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996178 --- Diff: docs/monitoring/metrics.md --- @@ -475,52 +474,76 @@ Flink exposes the following system metrics: - -Task -currentLowWatermark -The lowest watermark a task has received. - - -lastCheckpointDuration -The time it took to complete the last checkpoint. - - -lastCheckpointSize -The total size of the last checkpoint. - - -restartingTime -The time it took to restart the job. - - -numBytesInLocal -The total number of bytes this task has read from a local source. - - -numBytesInRemote -The total number of bytes this task has read from a remote source. - - -numBytesOut -The total number of bytes this task has emitted. - - - - -Operator -numRecordsIn -The total number of records this operator has received. - - -numRecordsOut -The total number of records this operator has emitted. - - -numSplitsProcessed -The total number of InputSplits this data source has processed. - + Task + currentLowWatermark + The lowest watermark a task has received. + + + lastCheckpointDuration + The time it took to complete the last checkpoint. + + + lastCheckpointSize + The total size of the last checkpoint. + + + restartingTime + The time it took to restart the job. + + + numBytesInLocal + The total number of bytes this task has read from a local source. + + + numBytesInRemote + The total number of bytes this task has read from a remote source. + + + numBytesOut + The total number of bytes this task has emitted. + + + Operator + numRecordsIn + The total number of records this operator has received. + + + numRecordsOut + The total number of records this operator has emitted. + + + numSplitsProcessed + The total number of InputSplits this data source has processed (if the operator is a data source). + + + latency + A latency gauge reporting the latency distribution from the different sources. + +### Latency tracking + +Flink allows to track the latency of records traveling through the system. To enable the latency tracking +a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`. + +At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMaker`. +The marker contains a timestamp from the time when the record has been emitted at the sources. +Latency marker can not overtake regular user records, thus if records are queuing up in front of an operator, --- End diff -- marker -> markers > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job,
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77996133 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField; + +public final class PojoSerializerGenerator { --- End diff -- Please add javadoc and audience/stability annotation (probably Internal). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996178 --- Diff: docs/monitoring/metrics.md --- @@ -475,52 +474,76 @@ Flink exposes the following system metrics: - -Task -currentLowWatermark -The lowest watermark a task has received. - - -lastCheckpointDuration -The time it took to complete the last checkpoint. - - -lastCheckpointSize -The total size of the last checkpoint. - - -restartingTime -The time it took to restart the job. - - -numBytesInLocal -The total number of bytes this task has read from a local source. - - -numBytesInRemote -The total number of bytes this task has read from a remote source. - - -numBytesOut -The total number of bytes this task has emitted. - - - - -Operator -numRecordsIn -The total number of records this operator has received. - - -numRecordsOut -The total number of records this operator has emitted. - - -numSplitsProcessed -The total number of InputSplits this data source has processed. - + Task + currentLowWatermark + The lowest watermark a task has received. + + + lastCheckpointDuration + The time it took to complete the last checkpoint. + + + lastCheckpointSize + The total size of the last checkpoint. + + + restartingTime + The time it took to restart the job. + + + numBytesInLocal + The total number of bytes this task has read from a local source. + + + numBytesInRemote + The total number of bytes this task has read from a remote source. + + + numBytesOut + The total number of bytes this task has emitted. + + + Operator + numRecordsIn + The total number of records this operator has received. + + + numRecordsOut + The total number of records this operator has emitted. + + + numSplitsProcessed + The total number of InputSplits this data source has processed (if the operator is a data source). + + + latency + A latency gauge reporting the latency distribution from the different sources. + +### Latency tracking + +Flink allows to track the latency of records traveling through the system. To enable the latency tracking +a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`. + +At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMaker`. +The marker contains a timestamp from the time when the record has been emitted at the sources. +Latency marker can not overtake regular user records, thus if records are queuing up in front of an operator, --- End diff -- marker -> markers --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2386: [FLINK-3660] Measure latency and exposes them via ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2386#discussion_r77996152 --- Diff: docs/monitoring/metrics.md --- @@ -475,52 +474,76 @@ Flink exposes the following system metrics: - -Task -currentLowWatermark -The lowest watermark a task has received. - - -lastCheckpointDuration -The time it took to complete the last checkpoint. - - -lastCheckpointSize -The total size of the last checkpoint. - - -restartingTime -The time it took to restart the job. - - -numBytesInLocal -The total number of bytes this task has read from a local source. - - -numBytesInRemote -The total number of bytes this task has read from a remote source. - - -numBytesOut -The total number of bytes this task has emitted. - - - - -Operator -numRecordsIn -The total number of records this operator has received. - - -numRecordsOut -The total number of records this operator has emitted. - - -numSplitsProcessed -The total number of InputSplits this data source has processed. - + Task + currentLowWatermark + The lowest watermark a task has received. + + + lastCheckpointDuration + The time it took to complete the last checkpoint. + + + lastCheckpointSize + The total size of the last checkpoint. + + + restartingTime + The time it took to restart the job. + + + numBytesInLocal + The total number of bytes this task has read from a local source. + + + numBytesInRemote + The total number of bytes this task has read from a remote source. + + + numBytesOut + The total number of bytes this task has emitted. + + + Operator + numRecordsIn + The total number of records this operator has received. + + + numRecordsOut + The total number of records this operator has emitted. + + + numSplitsProcessed + The total number of InputSplits this data source has processed (if the operator is a data source). + + + latency + A latency gauge reporting the latency distribution from the different sources. + +### Latency tracking + +Flink allows to track the latency of records traveling through the system. To enable the latency tracking +a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`. + +At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMaker`. --- End diff -- LatencyMaker -> LatencyMarker --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473738#comment-15473738 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77995790 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeSerializerProxy.java --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.lang.reflect.Constructor; + +public class GenTypeSerializerProxy extends TypeSerializer { --- End diff -- Please add javadoc. > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77995790 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeSerializerProxy.java --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.lang.reflect.Constructor; + +public class GenTypeSerializerProxy extends TypeSerializer { --- End diff -- Please add javadoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77995491 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.lang.reflect.Constructor; +import java.util.List; + +public class GenTypeComparatorProxy extends CompositeTypeComparator implements java.io.Serializable { --- End diff -- Please add javadoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473732#comment-15473732 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77995491 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.lang.reflect.Constructor; +import java.util.List; + +public class GenTypeComparatorProxy extends CompositeTypeComparator implements java.io.Serializable { --- End diff -- Please add javadoc. > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473725#comment-15473725 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77995157 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.lang.reflect.Constructor; +import java.util.List; + +public class GenTypeComparatorProxy extends CompositeTypeComparator implements java.io.Serializable { + private final String code; + private final String name; + private final Class clazz; + private final TypeComparator[] comparators; + private final TypeSerializer serializer; + + transient private CompositeTypeComparator impl = null; + + private void compile() { + try { + assert impl == null; + Class comparatorClazz = InstantiationUtil.compile(clazz.getClassLoader(), name, code); + Constructor[] ctors = comparatorClazz.getConstructors(); + assert ctors.length == 1; + impl = (CompositeTypeComparator) ctors[0].newInstance(new Object[]{comparators, serializer, clazz}); + } catch (Exception e) { + throw new RuntimeException("Unable to generate serializer: " + name, e); + } + } + + public GenTypeComparatorProxy(Class clazz, String name, String code,TypeComparator[] comparators, + TypeSerializer serializer) { + this.name = name; + this.code = code; + this.clazz = clazz; + this.comparators = comparators; + this.serializer = serializer; + compile(); + } + + @SuppressWarnings("unchecked") --- End diff -- Not needed if `comparators` has `` > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77995157 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.lang.reflect.Constructor; +import java.util.List; + +public class GenTypeComparatorProxy extends CompositeTypeComparator implements java.io.Serializable { + private final String code; + private final String name; + private final Class clazz; + private final TypeComparator[] comparators; + private final TypeSerializer serializer; + + transient private CompositeTypeComparator impl = null; + + private void compile() { + try { + assert impl == null; + Class comparatorClazz = InstantiationUtil.compile(clazz.getClassLoader(), name, code); + Constructor[] ctors = comparatorClazz.getConstructors(); + assert ctors.length == 1; + impl = (CompositeTypeComparator) ctors[0].newInstance(new Object[]{comparators, serializer, clazz}); + } catch (Exception e) { + throw new RuntimeException("Unable to generate serializer: " + name, e); + } + } + + public GenTypeComparatorProxy(Class clazz, String name, String code,TypeComparator[] comparators, + TypeSerializer serializer) { + this.name = name; + this.code = code; + this.clazz = clazz; + this.comparators = comparators; + this.serializer = serializer; + compile(); + } + + @SuppressWarnings("unchecked") --- End diff -- Not needed if `comparators` has `` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r7799 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -419,4 +481,61 @@ public String toString() { return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; } } + + public static String accessStringForField(Field f) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + return fieldName; + } + String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(getterName, new Class[0]); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. + return fieldName + "()"; + } + return getterName + "()"; + } + + public static String modifyStringForField(Field f, String arg) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + if (f.getType().isPrimitive()) { + return f.getName() + " = (" + + primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + ")" + arg; + } else { + return f.getName() + " = (" + f.getType().getCanonicalName() + ")" + arg; + } + } + String setterName = "set" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(setterName, f.getType()); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. --- End diff -- Same as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473715#comment-15473715 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r7799 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -419,4 +481,61 @@ public String toString() { return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; } } + + public static String accessStringForField(Field f) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + return fieldName; + } + String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(getterName, new Class[0]); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. + return fieldName + "()"; + } + return getterName + "()"; + } + + public static String modifyStringForField(Field f, String arg) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + if (f.getType().isPrimitive()) { + return f.getName() + " = (" + + primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + ")" + arg; + } else { + return f.getName() + " = (" + f.getType().getCanonicalName() + ")" + arg; + } + } + String setterName = "set" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(setterName, f.getType()); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. --- End diff -- Same as above. > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473702#comment-15473702 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77993831 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -419,4 +481,61 @@ public String toString() { return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; } } + + public static String accessStringForField(Field f) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + return fieldName; + } + String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(getterName, new Class[0]); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. + return fieldName + "()"; + } + return getterName + "()"; + } + + public static String modifyStringForField(Field f, String arg) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + if (f.getType().isPrimitive()) { + return f.getName() + " = (" + + primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + ")" + arg; --- End diff -- Why do we need to cast to the boxed types if the field is non-boxed? > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77993831 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -419,4 +481,61 @@ public String toString() { return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; } } + + public static String accessStringForField(Field f) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + return fieldName; + } + String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(getterName, new Class[0]); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. + return fieldName + "()"; + } + return getterName + "()"; + } + + public static String modifyStringForField(Field f, String arg) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + if (f.getType().isPrimitive()) { + return f.getName() + " = (" + + primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + ")" + arg; --- End diff -- Why do we need to cast to the boxed types if the field is non-boxed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473689#comment-15473689 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77993022 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -419,4 +481,61 @@ public String toString() { return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; } } + + public static String accessStringForField(Field f) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + return fieldName; + } + String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(getterName, new Class[0]); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. --- End diff -- Is this possible at this point? The rules of being a POJO should not allow this, right? > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473692#comment-15473692 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77993053 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -419,4 +481,61 @@ public String toString() { return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; } } + + public static String accessStringForField(Field f) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + return fieldName; + } + String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(getterName, new Class[0]); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. + return fieldName + "()"; + } + return getterName + "()"; + } + + public static String modifyStringForField(Field f, String arg) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + if (f.getType().isPrimitive()) { + return f.getName() + " = (" + + primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + ")" + arg; + } else { + return f.getName() + " = (" + f.getType().getCanonicalName() + ")" + arg; + } + } + String setterName = "set" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(setterName, f.getType()); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. --- End diff -- Same question as above. > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77993022 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -419,4 +481,61 @@ public String toString() { return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; } } + + public static String accessStringForField(Field f) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + return fieldName; + } + String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(getterName, new Class[0]); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. --- End diff -- Is this possible at this point? The rules of being a POJO should not allow this, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77993053 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java --- @@ -419,4 +481,61 @@ public String toString() { return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]"; } } + + public static String accessStringForField(Field f) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + return fieldName; + } + String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(getterName, new Class[0]); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. + return fieldName + "()"; + } + return getterName + "()"; + } + + public static String modifyStringForField(Field f, String arg) { + String fieldName = f.getName(); + if (Modifier.isPublic(f.getModifiers())) { + if (f.getType().isPrimitive()) { + return f.getName() + " = (" + + primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + ")" + arg; + } else { + return f.getName() + " = (" + f.getType().getCanonicalName() + ")" + arg; + } + } + String setterName = "set" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1); + Class parentClazz = f.getDeclaringClass(); + try { + parentClazz.getMethod(setterName, f.getType()); + } catch (NoSuchMethodException e) { + // No getter, it might be a scala class. --- End diff -- Same question as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4565) Support for SQL IN operator
[ https://issues.apache.org/jira/browse/FLINK-4565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473685#comment-15473685 ] Simone Robutti commented on FLINK-4565: --- I defined this parser combinator rule ```lazy val in:PackratParser[Expression] = term ~ "IN" ~ table ^^{ case l ~ _ ~ r => In(l,r) }``` `In` takes an expression (containing the field of the left table to be matched against the right table) and a Table. I need to define a parser for a Table but I never worked with parser-combinators so I have no idea how to do that. Any suggestion? > Support for SQL IN operator > --- > > Key: FLINK-4565 > URL: https://issues.apache.org/jira/browse/FLINK-4565 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Simone Robutti > > It seems that Flink SQL supports the uncorrelated sub-query IN operator. But > it should also be available in the Table API and tested. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3660) Measure latency of elements and expose it through web interface
[ https://issues.apache.org/jira/browse/FLINK-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473681#comment-15473681 ] ASF GitHub Bot commented on FLINK-3660: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2386 @aljoscha I updated the pull request, which now builds green (just rebasing fixed the issues). I also keep metrics now per operator, not only for the sinks. > Measure latency of elements and expose it through web interface > --- > > Key: FLINK-3660 > URL: https://issues.apache.org/jira/browse/FLINK-3660 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: pre-apache > > > It would be nice to expose the end-to-end latency of a streaming job in the > webinterface. > To achieve this, my initial thought was to attach an ingestion-time timestamp > at the sources to each record. > However, this introduces overhead for a monitoring feature users might not > even use (8 bytes for each element + System.currentTimeMilis() on each > element). > Therefore, I suggest to implement this feature by periodically sending > special events, similar to watermarks through the topology. > Those {{LatencyMarks}} are emitted at a configurable interval at the sources > and forwarded by the tasks. The sinks will compare the timestamp of the > latency marks with their current system time to determine the latency. > The latency marks will not add to the latency of a job, but the marks will be > delayed similarly than regular records, so their latency will approximate the > record latency. > Above suggestion expects the clocks on all taskmanagers to be in sync. > Otherwise, the measured latencies would also include the offsets between the > taskmanager's clocks. > In a second step, we can try to mitigate the issue by using the JobManager as > a central timing service. The TaskManagers will periodically query the JM for > the current time in order to determine the offset with their clock. > This offset would still include the network latency between TM and JM but it > would still lead to reasonably good estimations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)