[jira] [Assigned] (FLINK-7307) Add proper command line parsing tool to ClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-7307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fang Yong reassigned FLINK-7307: Assignee: Fang Yong > Add proper command line parsing tool to ClusterEntrypoint > - > > Key: FLINK-7307 > URL: https://issues.apache.org/jira/browse/FLINK-7307 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Fang Yong > > We need to add a proper command line parsing tool to the entry point of the > {{ClusterEntrypoint#parseArguments}}. At the moment, we are simply using the > {{ParameterTool}} as a temporary solution. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7147) Support greedy quantifier in CEP
[ https://issues.apache.org/jira/browse/FLINK-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113946#comment-16113946 ] ASF GitHub Bot commented on FLINK-7147: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4296 @dawidwys Regarding to the times().greedy(), the result is not expected and have fixed the issue in the latest PR. Also updated the doc. > Support greedy quantifier in CEP > > > Key: FLINK-7147 > URL: https://issues.apache.org/jira/browse/FLINK-7147 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > > Greedy quantifier will try to match the token as many times as possible. For > example, for pattern {{a b* c}} (skip till next is used) and inputs {{a b1 b2 > c}}, if the quantifier for {{b}} is greedy, it will only output {{a b1 b2 c}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4296 @dawidwys Regarding to the times().greedy(), the result is not expected and have fixed the issue in the latest PR. Also updated the doc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113916#comment-16113916 ] ASF GitHub Bot commented on FLINK-7367: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 @tzulitai Hi Gordon, shall we add more explanation of these keys in Flink docs, or do you think the java doc is sufficient? > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > We need to parameterize FlinkKinesisProducer to pass in the above params, in > order to cater to our need -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113914#comment-16113914 ] ASF GitHub Bot commented on FLINK-7367: --- GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4473 [FLINK-7367][kinesis connector] Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc) ## What is the purpose of the change Right now, FlinkKinesisProducer only expose two configs for the underlying KinesisProducer: - AGGREGATION_MAX_COUNT - COLLECTION_MAX_COUNT Well, according to AWS doc and their sample on github, developers can set more to make the max use of KinesisProducer, and make it fault-tolerant (e.g. by increasing timeout). I select a few more configs that we need when using Flink with Kinesis: - MAX_CONNECTIONS - RATE_LIMIT - RECORD_MAX_BUFFERED_TIME - RECORD_TIME_TO_LIVE - REQUEST_TIMEOUT We need to parameterize FlinkKinesisProducer to pass in the above params, in order to cater to our need ## Brief change log - *Added more config values into `ProducerConfigConstants`* - *Made FlinkKinesisProducer pick up more configs* - *Added an example in doc* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7363 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4473.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4473 commit ac0b4d22fde763dde62b26ed9a022d537bb29e58 Author: Bowen Li Date: 2017-08-04T03:59:02Z FLINK-7367 Parameterize FlinkKinesisProducer on RecordMaxBufferedTime, MaxConnections, RequestTimeout, and more > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > We need to parameterize FlinkKinesisProducer to pass in the above params, in > order to cater to our need -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 @tzulitai Hi Gordon, shall we add more explanation of these keys in Flink docs, or do you think the java doc is sufficient? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4473: [FLINK-7367][kinesis connector] Parameterize more ...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4473 [FLINK-7367][kinesis connector] Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc) ## What is the purpose of the change Right now, FlinkKinesisProducer only expose two configs for the underlying KinesisProducer: - AGGREGATION_MAX_COUNT - COLLECTION_MAX_COUNT Well, according to AWS doc and their sample on github, developers can set more to make the max use of KinesisProducer, and make it fault-tolerant (e.g. by increasing timeout). I select a few more configs that we need when using Flink with Kinesis: - MAX_CONNECTIONS - RATE_LIMIT - RECORD_MAX_BUFFERED_TIME - RECORD_TIME_TO_LIVE - REQUEST_TIMEOUT We need to parameterize FlinkKinesisProducer to pass in the above params, in order to cater to our need ## Brief change log - *Added more config values into `ProducerConfigConstants`* - *Made FlinkKinesisProducer pick up more configs* - *Added an example in doc* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7363 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4473.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4473 commit ac0b4d22fde763dde62b26ed9a022d537bb29e58 Author: Bowen Li Date: 2017-08-04T03:59:02Z FLINK-7367 Parameterize FlinkKinesisProducer on RecordMaxBufferedTime, MaxConnections, RequestTimeout, and more --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7368) MetricStore makes cpu spin at 100%
[ https://issues.apache.org/jira/browse/FLINK-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Chen updated FLINK-7368: - Description: Flink's `MetricStore` is not thread-safe. multi-treads may acess java' hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. Recently I met the case that flink jobmanager consumed 100% cpu. A part of stacktrace is shown below. The full jstack is in the attachment. {code:java} "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 runnable [0x7fbd7d1c2000] java.lang.Thread.State: RUNNABLE at java.util.HashMap.put(HashMap.java:494) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) at akka.dispatch.OnSuccess.internal(Future.scala:212) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) {code} There are 24 threads show same stacktrace as above to indicate they are spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many posts indicate multi-threads accessing hashmap cause this problem and I reproduce the case as well. The test code is attached. I only modify the HashMap.transfer() by adding concurrent barriers for different treads in order to simulate the timing of creation of cycles in hashmap's Entry. My program's stacktrace shows it hangs at same line of HashMap(HashMap.put(HashMap.java:494)) as the stacktrace I post above. Even through `MetricFetcher` has a 10 seconds minimum inteverl between each metrics qurey, it still cannot guarntee query responses do not acess `MtricStore`'s hashmap concurrently. Thus I think it's a bug to fix. was: Flink's `MetricStore` is not thread-safe. multi-treads may acess java' hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. Recently I met the case that flink jobmanager consumed 100% cpu. A part of stacktrace is shown below. The full jstack is in the attachment. {code:java} "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 runnable [0x7fbd7d1c2000] java.lang.Thread.State: RUNNABLE at java.util.HashMap.put(HashMap.java:494) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) at akka.dispatch.OnSuccess.internal(Future.scala:212) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) at java.util.c
[jira] [Updated] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7367: Summary: Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc) (was: Parameterize FlinkKinesisProducer on RecordMaxBufferedTime, MaxConnections, RequestTimeout, and more) > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > We need to parameterize FlinkKinesisProducer to pass in the above params, in > order to cater to our need -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7367) Parameterize FlinkKinesisProducer on RecordMaxBufferedTime, MaxConnections, RequestTimeout, and more
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7367: Description: Right now, FlinkKinesisProducer only expose two configs for the underlying KinesisProducer: - AGGREGATION_MAX_COUNT - COLLECTION_MAX_COUNT Well, according to [AWS doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] and [their sample on github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], developers can set more to make the max use of KinesisProducer, and make it fault-tolerant (e.g. by increasing timeout). I select a few more configs that we need when using Flink with Kinesis: - MAX_CONNECTIONS - RATE_LIMIT - RECORD_MAX_BUFFERED_TIME - RECORD_TIME_TO_LIVE - REQUEST_TIMEOUT We need to parameterize FlinkKinesisProducer to pass in the above params, in order to cater to our need was: Right now, FlinkKinesisProducer only expose two configs for the underlying KinesisProducer: - AGGREGATION_MAX_COUNT - COLLECTION_MAX_COUNT Well, according to [AWS doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] and [their sample on github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], developers can set more to make the max use of KinesisProducer, and make it fault-tolerant (e.g. by increasing timeout). We need to parameterize FlinkKinesisProducer to pass in params listed in AWS doc and sample, in order to cater to our need > Parameterize FlinkKinesisProducer on RecordMaxBufferedTime, MaxConnections, > RequestTimeout, and more > > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > We need to parameterize FlinkKinesisProducer to pass in the above params, in > order to cater to our need -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7367) Parameterize FlinkKinesisProducer on RecordMaxBufferedTime, MaxConnections, RequestTimeout, and more
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7367: Description: Right now, FlinkKinesisProducer only expose two configs for the underlying KinesisProducer: - AGGREGATION_MAX_COUNT - COLLECTION_MAX_COUNT Well, according to [AWS doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] and [their sample on github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], developers can set more to make the max use of KinesisProducer, and make it fault-tolerant (e.g. by increasing timeout). We need to parameterize FlinkKinesisProducer to pass in params listed in AWS doc and sample, in order to cater to our need was: Right now, FlinkKinesisProducer only expose two configs for the underlying KinesisProducer: - AGGREGATION_MAX_COUNT - COLLECTION_MAX_COUNT Well, according to [AWS doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] and [their sample on github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], developers can set more to make the max use of KinesisProducer, and make it fault-tolerant (e.g. by increasing timeout). We need to parameterize FlinkKinesisProducer in order to pass in params listed in AWS doc and sample > Parameterize FlinkKinesisProducer on RecordMaxBufferedTime, MaxConnections, > RequestTimeout, and more > > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > We need to parameterize FlinkKinesisProducer to pass in params listed in AWS > doc and sample, in order to cater to our need -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7368) MetricStore makes cpu spin at 100%
[ https://issues.apache.org/jira/browse/FLINK-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Chen updated FLINK-7368: - Description: Flink's `MetricStore` is not thread-safe. multi-treads may acess java' hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. Recently I met the case that flink jobmanager consumed 100% cpu. A part of stacktrace is shown below. The full jstack is in the attachment. {code:java} "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 runnable [0x7fbd7d1c2000] java.lang.Thread.State: RUNNABLE at java.util.HashMap.put(HashMap.java:494) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) at akka.dispatch.OnSuccess.internal(Future.scala:212) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) {code} There are 24 threads show same stacktrace as above to indicate they are spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many posts indicate multi-threads accessing hashmap cause this problem and I reproduce the case as well. The test code is attached. I only modify the HashMap.transfer() by adding concurrent barriers for different treads in order to simulate the timing of creation of cycles in hashmap's Entry. Even through `MetricFetcher` has a 10 seconds minimum inteverl between each metrics qurey, it still cannot guarntee query responses do not acess `MtricStore`'s hashmap concurrently. Thus I think it's a bug to fix. was: Flink's `MetricStore` is not thread-safe. multi-treads may acess java' hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. Recently I met the case that flink jobmanager consumed 100% cpu. A part of stacktrace is shown below. The full jstack is in the attachment. {code:java} "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 runnable [0x7fbd7d1c2000] java.lang.Thread.State: RUNNABLE at java.util.HashMap.put(HashMap.java:494) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) at akka.dispatch.OnSuccess.internal(Future.scala:212) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
[jira] [Updated] (FLINK-7368) MetricStore makes cpu spin at 100%
[ https://issues.apache.org/jira/browse/FLINK-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Chen updated FLINK-7368: - Attachment: MyHashMap.java MyHashMapInfiniteLoopTest.java MyHashMap is a copy of Java HashMap but adding 2 concurrent barriers in tranfer() methd. MyHashMapInfiniteLoop is for uint test. > MetricStore makes cpu spin at 100% > -- > > Key: FLINK-7368 > URL: https://issues.apache.org/jira/browse/FLINK-7368 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Nico Chen > Attachments: jm-jstack.log, MyHashMapInfiniteLoopTest.java, > MyHashMap.java > > > Flink's `MetricStore` is not thread-safe. multi-treads may acess java' > hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. > Recently I met the case that flink jobmanager consumed 100% cpu. A part of > stacktrace is shown below. The full jstack is in the attachment. > {code:java} > "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 > runnable [0x7fbd7d1c2000] >java.lang.Thread.State: RUNNABLE > at java.util.HashMap.put(HashMap.java:494) > at > org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) > at > org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) > at > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) > at > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) > at > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) > at akka.dispatch.OnSuccess.internal(Future.scala:212) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) > at > scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) > at > scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) > at > java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) > at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) > at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) > {code} > There are 24 threads show same stacktrace as above to indicate they are > spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many > posts indicate multi-threads accessing hashmap cause this problem and I > reproduce the case as well. The test code is attached. I mainly modify the > HashMap.transfer() by adding concurrent barriers for different treads in > order to simulate the timing of creation of cyclic linked list. > Even through `MetricFetcher` has a 10 seconds minimum inteverl between each > metrics qurey, it still cannot guarntee query responses do not acess > `MtricStore`'s hashmap concurrently. > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7368) MetricStore makes cpu spin at 100%
[ https://issues.apache.org/jira/browse/FLINK-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Chen updated FLINK-7368: - Description: Flink's `MetricStore` is not thread-safe. multi-treads may acess java' hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. Recently I met the case that flink jobmanager consumed 100% cpu. A part of stacktrace is shown below. The full jstack is in the attachment. {code:java} "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 runnable [0x7fbd7d1c2000] java.lang.Thread.State: RUNNABLE at java.util.HashMap.put(HashMap.java:494) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) at akka.dispatch.OnSuccess.internal(Future.scala:212) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) {code} There are 24 threads show same stacktrace as above to indicate they are spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many posts indicate multi-threads accessing hashmap cause this problem and I reproduce the case as well. The test code is attached. I mainly modify the HashMap.transfer() by adding concurrent barriers for different treads in order to simulate the timing of creation of cyclic linked list. Even through `MetricFetcher` has a 10 seconds minimum inteverl between each metrics qurey, it still cannot guarntee query responses do not acess `MtricStore`'s hashmap concurrently. was: Flink's `MetricStore` is not thread-safe. multi-treads may acess java' hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. Recently I met the case that flink jobmanager consumed 100% cpu. A part of stacktrace is shown below. The full jstack is in the attachment. {code:java} "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 runnable [0x7fbd7d1c2000] java.lang.Thread.State: RUNNABLE at java.util.HashMap.put(HashMap.java:494) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) at akka.dispatch.OnSuccess.internal(Future.scala:212) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) at java.util.concurrent.ForkJo
[jira] [Resolved] (FLINK-7356) misleading s3 file uri in configuration file
[ https://issues.apache.org/jira/browse/FLINK-7356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-7356. Resolution: Fixed Thanks for the contribution [~phoenixjiangnan]. Fixed in master via 40901ed5866b95b19ba056575a7480fa95b40c68. Fixed in 1.3 via 37ebcb67da64dd677f1e6d8467f690dcbc6d3718. > misleading s3 file uri in configuration file > > > Key: FLINK-7356 > URL: https://issues.apache.org/jira/browse/FLINK-7356 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.3.1 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > in > https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml, > the comment in line 121 should say {{"*s3*://" for S3 file system}} rather > than {{"S3://" for S3 file system}}, because {{S3://xxx}} is not recognized > by AWS SDK. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7127) Remove unnecessary null check or add null check
[ https://issues.apache.org/jira/browse/FLINK-7127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-7127. Resolution: Fixed Assignee: Dmitrii Kniazev Fix Version/s: 1.3.3 1.4.0 Thanks for the contribution [~mylog00]. Gentle reminder: you should assign the ticket to yourself if you decide to work on it. That would prevent someone else picking it up also and ending up in duplicate efforts. Fixed in master via bef7484610dd206d7c749e32f5d91a0f67ef405a. Fixed in 1.3 via 89659c6d52ac5582cc76a50f912fa4cddccddc77. > Remove unnecessary null check or add null check > --- > > Key: FLINK-7127 > URL: https://issues.apache.org/jira/browse/FLINK-7127 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Dmitrii Kniazev >Priority: Trivial > Labels: starter > Fix For: 1.4.0, 1.3.3 > > > In {{HeapKeyedStateBackend#snapshot}} we have: > {code} > for (Map.Entry> kvState : stateTables.entrySet()) > { > // 1) Here we don't check for null > metaInfoSnapshots.add(kvState.getValue().getMetaInfo().snapshot()); > kVStateToId.put(kvState.getKey(), kVStateToId.size()); > // 2) Here we check for null > StateTable stateTable = kvState.getValue(); > if (null != stateTable) { > cowStateStableSnapshots.put(stateTable, > stateTable.createSnapshot()); > } > } > {code} > Either this can lead to a NPE and we should check it in 1) or we remove the > null check in 2). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-6493. Resolution: Fixed Fix Version/s: 1.3.3 Thanks for the contribution [~mingleizhang]! Fixed in master via 875a1369c2ecfd721e3797f59f7ae29e7c522840. Fixed in 1.3 via 856d44cdb6147970f0a5912652bedbfaf8f9ca33. > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7368) MetricStore makes cpu spin at 100%
[ https://issues.apache.org/jira/browse/FLINK-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113865#comment-16113865 ] ASF GitHub Bot commented on FLINK-7368: --- GitHub user nicochen opened a pull request: https://github.com/apache/flink/pull/4472 FLINK-7368: MetricStore makes cpu spin at 100% Flink's `MetricStore` is not thread-safe. multi-treads may acess java' hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. Recently I met the case that flink jobmanager consumed 100% cpu. A part of stacktrace is shown below. The full jstack is in the attachment. {code:java} "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 runnable [0x7fbd7d1c2000] java.lang.Thread.State: RUNNABLE at java.util.HashMap.put(HashMap.java:494) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) at akka.dispatch.OnSuccess.internal(Future.scala:212) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) {code} There are 24 threads show same stacktrace as above to indicate they are spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many posts indicate multi-threads accessing hashmap cause this problem and I reproduce the case as well. Even through `MetricFetcher` has a 10 seconds minimum inteverl between each metrics qurey, it still cannot guarntee query responses do not acess `MtricStore`'s hashmap concurrently. You can merge this pull request into a Git repository by running: $ git pull https://github.com/nicochen/flink FLINK-7368 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4472.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4472 commit abfa571fbf99be4b98d8d690ed10df1440dd21d5 Author: nicochen2012 <16100...@cnsuning.com> Date: 2017-08-04T03:21:49Z FLINK-7368: MetricStore makes cpu spin at 100% > MetricStore makes cpu spin at 100% > -- > > Key: FLINK-7368 > URL: https://issues.apache.org/jira/browse/FLINK-7368 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Nico Chen > Attachments: jm-jstack.log > > > Flink's `MetricStore` is not thread-safe. multi-treads may acess java' > hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. > Recently I met the case that flink jobmanager consumed 100% cpu. A part of > stacktrace is shown below. The full jstack is in the attachment. > {code:java} > "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 > runnable [0x7fbd7d1c2000] >java.lang.Thread.State: RUNNABLE > at java.util.HashMap.put(HashMap.java:494) > at > org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) > at > org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) > at > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) > at > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) > at > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.on
[GitHub] flink pull request #4472: FLINK-7368: MetricStore makes cpu spin at 100%
GitHub user nicochen opened a pull request: https://github.com/apache/flink/pull/4472 FLINK-7368: MetricStore makes cpu spin at 100% Flink's `MetricStore` is not thread-safe. multi-treads may acess java' hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. Recently I met the case that flink jobmanager consumed 100% cpu. A part of stacktrace is shown below. The full jstack is in the attachment. {code:java} "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 runnable [0x7fbd7d1c2000] java.lang.Thread.State: RUNNABLE at java.util.HashMap.put(HashMap.java:494) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) at akka.dispatch.OnSuccess.internal(Future.scala:212) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) {code} There are 24 threads show same stacktrace as above to indicate they are spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many posts indicate multi-threads accessing hashmap cause this problem and I reproduce the case as well. Even through `MetricFetcher` has a 10 seconds minimum inteverl between each metrics qurey, it still cannot guarntee query responses do not acess `MtricStore`'s hashmap concurrently. You can merge this pull request into a Git repository by running: $ git pull https://github.com/nicochen/flink FLINK-7368 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4472.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4472 commit abfa571fbf99be4b98d8d690ed10df1440dd21d5 Author: nicochen2012 <16100...@cnsuning.com> Date: 2017-08-04T03:21:49Z FLINK-7368: MetricStore makes cpu spin at 100% --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7368) MetricStore makes cpu spin at 100%
[ https://issues.apache.org/jira/browse/FLINK-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Chen updated FLINK-7368: - Attachment: jm-jstack.log > MetricStore makes cpu spin at 100% > -- > > Key: FLINK-7368 > URL: https://issues.apache.org/jira/browse/FLINK-7368 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Nico Chen > Attachments: jm-jstack.log > > > Flink's `MetricStore` is not thread-safe. multi-treads may acess java' > hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. > Recently I met the case that flink jobmanager consumed 100% cpu. A part of > stacktrace is shown below. The full jstack is in the attachment. > {code:java} > "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 > runnable [0x7fbd7d1c2000] >java.lang.Thread.State: RUNNABLE > at java.util.HashMap.put(HashMap.java:494) > at > org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) > at > org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) > at > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) > at > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) > at > org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) > at akka.dispatch.OnSuccess.internal(Future.scala:212) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) > at > scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) > at > scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) > at > java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) > at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) > at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) > {code} > There are 24 threads show same stacktrace as above to indicate they are > spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many > posts indicate multi-threads accessing hashmap cause this problem and I > reproduce the case as well. Even through `MetricFetcher` has a 10 seconds > minimum inteverl between each metrics qurey, it still cannot guarntee query > responses do not acess `MtricStore`'s hashmap concurrently. > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7368) MetricStore makes cpu spin at 100%
[ https://issues.apache.org/jira/browse/FLINK-7368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Chen updated FLINK-7368: - Description: Flink's `MetricStore` is not thread-safe. multi-treads may acess java' hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. Recently I met the case that flink jobmanager consumed 100% cpu. A part of stacktrace is shown below. The full jstack is in the attachment. {code:java} "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 runnable [0x7fbd7d1c2000] java.lang.Thread.State: RUNNABLE at java.util.HashMap.put(HashMap.java:494) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) at akka.dispatch.OnSuccess.internal(Future.scala:212) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) {code} There are 24 threads show same stacktrace as above to indicate they are spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many posts indicate multi-threads accessing hashmap cause this problem and I reproduce the case as well. Even through `MetricFetcher` has a 10 seconds minimum inteverl between each metrics qurey, it still cannot guarntee query responses do not acess `MtricStore`'s hashmap concurrently. was: Flink's `MetricStore` is not thread-safe. multi-treads may acess java' hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. Recently I met the case that flink jobmanager consumed 100% cpu. A part of stacktrace is shown below. The full jstack is in the attachment. {code:java} "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 runnable [0x7fbd7d1c2000] java.lang.Thread.State: RUNNABLE at java.util.HashMap.put(HashMap.java:494) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) at akka.dispatch.OnSuccess.internal(Future.scala:212) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) {code} There are 24 threads show same stacktrace as above to indicate they are spining at HashMap.put(HashMap.java:494) (I am using Java 1.7
[jira] [Created] (FLINK-7368) MetricStore makes cpu spin at 100%
Nico Chen created FLINK-7368: Summary: MetricStore makes cpu spin at 100% Key: FLINK-7368 URL: https://issues.apache.org/jira/browse/FLINK-7368 Project: Flink Issue Type: Bug Components: Metrics Reporter: Nico Chen Flink's `MetricStore` is not thread-safe. multi-treads may acess java' hashmap inside `MetricStore` and can tirgger hashmap's infinte loop. Recently I met the case that flink jobmanager consumed 100% cpu. A part of stacktrace is shown below. The full jstack is in the attachment. {code:java} "ForkJoinPool-1-worker-19" daemon prio=10 tid=0x7fbdacac9800 nid=0x64c1 runnable [0x7fbd7d1c2000] java.lang.Thread.State: RUNNABLE at java.util.HashMap.put(HashMap.java:494) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.addMetric(MetricStore.java:176) at org.apache.flink.runtime.webmonitor.metrics.MetricStore.add(MetricStore.java:121) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.addMetrics(MetricFetcher.java:198) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher.access$500(MetricFetcher.java:58) at org.apache.flink.runtime.webmonitor.metrics.MetricFetcher$4.onSuccess(MetricFetcher.java:188) at akka.dispatch.OnSuccess.internal(Future.scala:212) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:117) at scala.concurrent.Future$$anonfun$onSuccess$1.apply(Future.scala:115) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at java.util.concurrent.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1265) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) {code} There are 24 threads show same stacktrace as above to indicate they are spining at HashMap.put(HashMap.java:494) (I am using Java 1.7.0_6). Many posts indicate multi-threads accessing hashmap cause this problem and I reproduce the case as well. Even through `MetricFetcher` has a 10 seconds minimum inteverl between each metrics qurey, it still cannot guarntee query responses do not acess `MtricStore`'s hashmap concurrently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6429) Bump up Calcite version to 1.13
[ https://issues.apache.org/jira/browse/FLINK-6429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113850#comment-16113850 ] ASF GitHub Bot commented on FLINK-6429: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4373#discussion_r131302891 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala --- @@ -55,12 +55,13 @@ class JoinTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(1), -term("select", "a", "b", "proctime") +term("select", "a", "b", "-(proctime, 360) AS -", --- End diff -- I think it's easy to prevent the time indicator push-down in our side. Calcite 1.3 changed the `ProjectJoinTransposeRule.INSTANCE` to `new ProjectJoinTransposeRule( PushProjector.ExprCondition.TRUE, RelFactories.LOGICAL_BUILDER);` instead of `new ProjectJoinTransposeRule(PushProjector.ExprCondition.FALSE, RelFactories.LOGICAL_BUILDER);`. In order to change the default behavior of the rule, the only thing we need to do is to create a new `ProjectJoinTransposeRule` with a custom `PushProjector.ExprCondition` which filters prevent time indicator rex nodes. > Bump up Calcite version to 1.13 > --- > > Key: FLINK-6429 > URL: https://issues.apache.org/jira/browse/FLINK-6429 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Haohui Mai > > This is an umbrella issue for all tasks that need to be done once Apache > Calcite 1.13 is released. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4373: [FLINK-6429] [table] Bump up Calcite version to 1....
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4373#discussion_r131302891 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala --- @@ -55,12 +55,13 @@ class JoinTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(1), -term("select", "a", "b", "proctime") +term("select", "a", "b", "-(proctime, 360) AS -", --- End diff -- I think it's easy to prevent the time indicator push-down in our side. Calcite 1.3 changed the `ProjectJoinTransposeRule.INSTANCE` to `new ProjectJoinTransposeRule( PushProjector.ExprCondition.TRUE, RelFactories.LOGICAL_BUILDER);` instead of `new ProjectJoinTransposeRule(PushProjector.ExprCondition.FALSE, RelFactories.LOGICAL_BUILDER);`. In order to change the default behavior of the rule, the only thing we need to do is to create a new `ProjectJoinTransposeRule` with a custom `PushProjector.ExprCondition` which filters prevent time indicator rex nodes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4349: [FLINK-7127] [runtime] Remove unnecessary null che...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4349 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1234) Make Hadoop2 profile default
[ https://issues.apache.org/jira/browse/FLINK-1234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113834#comment-16113834 ] ASF GitHub Bot commented on FLINK-1234: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4408 > Make Hadoop2 profile default > > > Key: FLINK-1234 > URL: https://issues.apache.org/jira/browse/FLINK-1234 > Project: Flink > Issue Type: Improvement >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 0.8.0 > > > As per mailing list discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7127) Remove unnecessary null check or add null check
[ https://issues.apache.org/jira/browse/FLINK-7127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113833#comment-16113833 ] ASF GitHub Bot commented on FLINK-7127: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4349 > Remove unnecessary null check or add null check > --- > > Key: FLINK-7127 > URL: https://issues.apache.org/jira/browse/FLINK-7127 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Priority: Trivial > Labels: starter > > In {{HeapKeyedStateBackend#snapshot}} we have: > {code} > for (Map.Entry> kvState : stateTables.entrySet()) > { > // 1) Here we don't check for null > metaInfoSnapshots.add(kvState.getValue().getMetaInfo().snapshot()); > kVStateToId.put(kvState.getKey(), kVStateToId.size()); > // 2) Here we check for null > StateTable stateTable = kvState.getValue(); > if (null != stateTable) { > cowStateStableSnapshots.put(stateTable, > stateTable.createSnapshot()); > } > } > {code} > Either this can lead to a NPE and we should check it in 1) or we remove the > null check in 2). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4408: Release 1.3
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4408 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113832#comment-16113832 ] ASF GitHub Bot commented on FLINK-6493: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4328 > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7356) misleading s3 file uri in configuration file
[ https://issues.apache.org/jira/browse/FLINK-7356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113831#comment-16113831 ] ASF GitHub Bot commented on FLINK-7356: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4466 > misleading s3 file uri in configuration file > > > Key: FLINK-7356 > URL: https://issues.apache.org/jira/browse/FLINK-7356 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.3.1 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > in > https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml, > the comment in line 121 should say {{"*s3*://" for S3 file system}} rather > than {{"S3://" for S3 file system}}, because {{S3://xxx}} is not recognized > by AWS SDK. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4466: [FLINK-7356][configuration] misleading s3 file uri...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4466 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4328: [FLINK-6493] Fix ineffective null check in Registe...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4328 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113827#comment-16113827 ] ASF GitHub Bot commented on FLINK-6493: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4328 Thanks for that @zhangminglei. However, I've already finished testing a batch of commits with this fix included, and would like to merge that now. Sometimes I collect a batch of commits and run them on Travis before merging (that's why it isn't actually merged yet). It's safe to leave some final minor cosmetic fix to the person merging the PR if they mentioned they'll fix it when merging. That'll also avoid duplicate work and wasted efforts. > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4328: [FLINK-6493] Fix ineffective null check in RegisteredOper...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4328 Thanks for that @zhangminglei. However, I've already finished testing a batch of commits with this fix included, and would like to merge that now. Sometimes I collect a batch of commits and run them on Travis before merging (that's why it isn't actually merged yet). It's safe to leave some final minor cosmetic fix to the person merging the PR if they mentioned they'll fix it when merging. That'll also avoid duplicate work and wasted efforts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113808#comment-16113808 ] ASF GitHub Bot commented on FLINK-6493: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4328 Hi, @tzulitai I have removed the extra most-outer wrapping parenthesis by myself. > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4328: [FLINK-6493] Fix ineffective null check in RegisteredOper...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4328 Hi, @tzulitai I have removed the extra most-outer wrapping parenthesis by myself. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7293) Support custom order by in PatternStream
[ https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113804#comment-16113804 ] ASF GitHub Bot commented on FLINK-7293: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4418 @dawidwys Updated the contribution checklist. > Support custom order by in PatternStream > > > Key: FLINK-7293 > URL: https://issues.apache.org/jira/browse/FLINK-7293 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, when {{ProcessingTime}} is configured, the events are fed to NFA > in the order of the arriving time and when {{EventTime}} is configured, the > events are fed to NFA in the order of the event time. It should also allow > custom {{order by}} to allow users to define the order of the events besides > the above factors. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4418: [FLINK-7293] [cep] Support custom order by in PatternStre...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4418 @dawidwys Updated the contribution checklist. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7367) Parameterize FlinkKinesisProducer on RecordMaxBufferedTime, MaxConnections, RequestTimeout, and more
Bowen Li created FLINK-7367: --- Summary: Parameterize FlinkKinesisProducer on RecordMaxBufferedTime, MaxConnections, RequestTimeout, and more Key: FLINK-7367 URL: https://issues.apache.org/jira/browse/FLINK-7367 Project: Flink Issue Type: Improvement Components: Kinesis Connector Affects Versions: 1.3.0 Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.3.3 Right now, FlinkKinesisProducer only expose two configs for the underlying KinesisProducer: - AGGREGATION_MAX_COUNT - COLLECTION_MAX_COUNT Well, according to [AWS doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] and [their sample on github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], developers can set more to make the max use of KinesisProducer, and make it fault-tolerant (e.g. by increasing timeout). We need to parameterize FlinkKinesisProducer in order to pass in params listed in AWS doc and sample -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7366) Upgrade kinesis-producer-library in flink-connector-kinesis from 0.10.2 to 0.12.5
[ https://issues.apache.org/jira/browse/FLINK-7366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7366: Description: flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus problematic. It doesn't even have good retry logic, therefore Flink fails really frequently (about every 10 mins as we observed) when Flink writes too fast to Kinesis and receives RateLimitExceededException, Upgrade it to 0.12.5 released in May 2017 to take advantages of all the new features and improvements. was: flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus problematic. Upgrade it to 0.12.5 released in May 2017 to take advantages of all the new features and improvements. > Upgrade kinesis-producer-library in flink-connector-kinesis from 0.10.2 to > 0.12.5 > - > > Key: FLINK-7366 > URL: https://issues.apache.org/jira/browse/FLINK-7366 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0 > > > flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which > is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus > problematic. It doesn't even have good retry logic, therefore Flink fails > really frequently (about every 10 mins as we observed) when Flink writes too > fast to Kinesis and receives RateLimitExceededException, > Upgrade it to 0.12.5 released in May 2017 to take advantages of all the new > features and improvements. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7366) Upgrade kinesis-producer-library in flink-connector-kinesis from 0.10.2 to 0.12.5
Bowen Li created FLINK-7366: --- Summary: Upgrade kinesis-producer-library in flink-connector-kinesis from 0.10.2 to 0.12.5 Key: FLINK-7366 URL: https://issues.apache.org/jira/browse/FLINK-7366 Project: Flink Issue Type: Improvement Components: Kinesis Connector Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.4.0 flink-connector-kinesis currently uses kinesis-producer-library 0.10.2, which is released in Nov 2015 by AWS. It's old. It's the fourth release, and thus problematic. Upgrade it to 0.12.5 released in May 2017 to take advantages of all the new features and improvements. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7365) warning of attempt to override final parameter: fs.s3.buffer.dir
[ https://issues.apache.org/jira/browse/FLINK-7365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7365: Affects Version/s: 1.3.0 Component/s: Configuration > warning of attempt to override final parameter: fs.s3.buffer.dir > > > Key: FLINK-7365 > URL: https://issues.apache.org/jira/browse/FLINK-7365 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.3.0 >Reporter: Bowen Li > > I'm seeing hundreds of line of the following log in my JobManager log file: > {code:java} > 2017-08-03 19:48:45,330 WARN org.apache.hadoop.conf.Configuration > - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to > override final parameter: fs.s3.buffer.dir; Ignoring. > 2017-08-03 19:48:45,485 WARN org.apache.hadoop.conf.Configuration > - /etc/hadoop/conf/core-site.xml:an attempt to override final > parameter: fs.s3.buffer.dir; Ignoring. > 2017-08-03 19:48:45,486 WARN org.apache.hadoop.conf.Configuration > - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to > override final parameter: fs.s3.buffer.dir; Ignoring. > 2017-08-03 19:48:45,626 WARN org.apache.hadoop.conf.Configuration > - /etc/hadoop/conf/core-site.xml:an attempt to override final > parameter: fs.s3.buffer.dir; Ignoring > .. > {code} > Info of my Flink cluster: > - Running on EMR with emr-5.6.0 > - Using FSStateBackend, writing checkpointing data files to s3 > - Configured s3 with S3AFileSystem according to > https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/aws.html#set-s3-filesystem > - AWS forbids resetting 'fs.s3.buffer.dir' value (it has a tag on > this property in core-site.xml), so I set 'fs.s3a.buffer.dir' as '/tmp' > Here's my core-site.xml file: > {code:java} > > > > > > > > > > > > > > > > > > > fs.s3.buffer.dir > /mnt/s3,/mnt1/s3 > true > > > fs.s3.impl > org.apache.hadoop.fs.s3a.S3AFileSystem > > > fs.s3n.impl > com.amazon.ws.emr.hadoop.fs.EmrFileSystem > > > ipc.client.connect.max.retries.on.timeouts > 5 > > > hadoop.security.key.default.bitlength > 256 > > > hadoop.proxyuser.hadoop.groups > * > > > hadoop.tmp.dir > /mnt/var/lib/hadoop/tmp > > > hadoop.proxyuser.hadoop.hosts > * > > > io.file.buffer.size > 65536 > > > fs.AbstractFileSystem.s3.impl > org.apache.hadoop.fs.s3.EMRFSDelegate > > > fs.s3a.buffer.dir > /tmp > > > fs.s3bfs.impl > org.apache.hadoop.fs.s3.S3FileSystem > > > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7365) warning of attempt to override final parameter: fs.s3.buffer.dir
Bowen Li created FLINK-7365: --- Summary: warning of attempt to override final parameter: fs.s3.buffer.dir Key: FLINK-7365 URL: https://issues.apache.org/jira/browse/FLINK-7365 Project: Flink Issue Type: Bug Reporter: Bowen Li I'm seeing hundreds of line of the following log in my JobManager log file: {code:java} 2017-08-03 19:48:45,330 WARN org.apache.hadoop.conf.Configuration - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to override final parameter: fs.s3.buffer.dir; Ignoring. 2017-08-03 19:48:45,485 WARN org.apache.hadoop.conf.Configuration - /etc/hadoop/conf/core-site.xml:an attempt to override final parameter: fs.s3.buffer.dir; Ignoring. 2017-08-03 19:48:45,486 WARN org.apache.hadoop.conf.Configuration - /usr/lib/hadoop/etc/hadoop/core-site.xml:an attempt to override final parameter: fs.s3.buffer.dir; Ignoring. 2017-08-03 19:48:45,626 WARN org.apache.hadoop.conf.Configuration - /etc/hadoop/conf/core-site.xml:an attempt to override final parameter: fs.s3.buffer.dir; Ignoring .. {code} Info of my Flink cluster: - Running on EMR with emr-5.6.0 - Using FSStateBackend, writing checkpointing data files to s3 - Configured s3 with S3AFileSystem according to https://ci.apache.org/projects/flink/flink-docs-release-1.4/setup/aws.html#set-s3-filesystem - AWS forbids resetting 'fs.s3.buffer.dir' value (it has a tag on this property in core-site.xml), so I set 'fs.s3a.buffer.dir' as '/tmp' Here's my core-site.xml file: {code:java} fs.s3.buffer.dir /mnt/s3,/mnt1/s3 true fs.s3.impl org.apache.hadoop.fs.s3a.S3AFileSystem fs.s3n.impl com.amazon.ws.emr.hadoop.fs.EmrFileSystem ipc.client.connect.max.retries.on.timeouts 5 hadoop.security.key.default.bitlength 256 hadoop.proxyuser.hadoop.groups * hadoop.tmp.dir /mnt/var/lib/hadoop/tmp hadoop.proxyuser.hadoop.hosts * io.file.buffer.size 65536 fs.AbstractFileSystem.s3.impl org.apache.hadoop.fs.s3.EMRFSDelegate fs.s3a.buffer.dir /tmp fs.s3bfs.impl org.apache.hadoop.fs.s3.S3FileSystem {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7357) HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY HOP window
[ https://issues.apache.org/jira/browse/FLINK-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haohui Mai reassigned FLINK-7357: - Assignee: Haohui Mai > HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY > HOP window > - > > Key: FLINK-7357 > URL: https://issues.apache.org/jira/browse/FLINK-7357 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Rong Rong >Assignee: Haohui Mai > > The following SQL does not compile: > {code:title=invalid_having_hop_start_sql} > SELECT > c AS k, > COUNT(a) AS v, > HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS > windowStart, > HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd > FROM > T1 > GROUP BY > HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), > c > HAVING > SUM(b) > 1 > {code} > While individually keeping HAVING clause or HOP_START field compiles and runs > without issue. > more details: > https://github.com/apache/flink/compare/master...walterddr:having_does_not_work_with_hop_start_end -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7364) Log exceptions from user code in streaming jobs
Elias Levy created FLINK-7364: - Summary: Log exceptions from user code in streaming jobs Key: FLINK-7364 URL: https://issues.apache.org/jira/browse/FLINK-7364 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.3.1 Reporter: Elias Levy Currently, if an exception arises in user supplied code within an operator in a streaming job, Flink terminates the job, but it fails to record the reason for the termination. The logs do not record that there was an exception at all, much less recording the type of exception and where it occurred. This makes it difficult to debug jobs without implementing exception recording code on all user supplied operators. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112899#comment-16112899 ] ASF GitHub Bot commented on FLINK-6493: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4328 Sorry, Sir. I'll pay attention to the details of the next time. > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4328: [FLINK-6493] Fix ineffective null check in RegisteredOper...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4328 Sorry, Sir. I'll pay attention to the details of the next time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3347) TaskManager (or its ActorSystem) need to restart in case they notice quarantine
[ https://issues.apache.org/jira/browse/FLINK-3347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-3347: --- Fix Version/s: (was: 1.2.0) 1.2.1 > TaskManager (or its ActorSystem) need to restart in case they notice > quarantine > --- > > Key: FLINK-3347 > URL: https://issues.apache.org/jira/browse/FLINK-3347 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 0.10.1 >Reporter: Stephan Ewen >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.0.0, 1.1.4, 1.3.0, 1.2.1 > > > There are cases where Akka quarantines remote actor systems. In that case, no > further communication is possible with that actor system unless one of the > two actor systems is restarted. > The result is that a TaskManager is up and available, but cannot register at > the JobManager (Akka refuses connection because of the quarantined state), > making the TaskManager a useless process. > I suggest to let the TaskManager restart itself once it notices that either > it quarantined the JobManager, or the JobManager quarantined it. > It is possible to recognize that by listening to certain events in the actor > system event stream: > http://stackoverflow.com/questions/32471088/akka-cluster-detecting-quarantined-state -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6094) Implement stream-stream proctime non-window inner join
[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112882#comment-16112882 ] ASF GitHub Bot commented on FLINK-6094: --- GitHub user hequn8128 opened a pull request: https://github.com/apache/flink/pull/4471 [FLINK-6094] [table] Implement stream-stream proctime non-window inner join ## What is the purpose of the change Implement stream-stream proctime non-window inner join for table-api/sql ## Brief change log - Implement stream-stream inner join. `DataStreamJoinRule`, `DataStreamJoin`... - Implement retraction for stream-stream inner join. - Support state retrain time configuration for stream-stream inner join. - Add key extractor for `DataStreamJoin` in `UniqueKeyExtractor`. - Modify UniqueKeyExtractor to support cases like `select(pk as pk1, pk as pk2)`, in thoes cases, pk1 or pk2 can either be the unique key. ## Verifying this change This change added tests and can be verified as follows: - Add JoinValidation Tests - Add JoinHarnessTest to test data expiration - Add JoinITCase for SQL and table-api ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs, I have checked the sql and table-api docs and find the inner join has already been added) You can merge this pull request into a Git repository by running: $ git pull https://github.com/hequn8128/flink 6094_pr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4471.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4471 commit 459c3b5b7801a15eac907ab5c11f8ca01acdc1c6 Author: 军长 Date: 2017-07-30T10:45:45Z [FLINK-6094] [table] Implement stream-stream proctime non-window inner join > Implement stream-stream proctime non-window inner join > --- > > Key: FLINK-6094 > URL: https://issues.apache.org/jira/browse/FLINK-6094 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > This includes: > 1.Implement stream-stream proctime non-window inner join > 2.Implement the retract process logic for join -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...
GitHub user hequn8128 opened a pull request: https://github.com/apache/flink/pull/4471 [FLINK-6094] [table] Implement stream-stream proctime non-window inner join ## What is the purpose of the change Implement stream-stream proctime non-window inner join for table-api/sql ## Brief change log - Implement stream-stream inner join. `DataStreamJoinRule`, `DataStreamJoin`... - Implement retraction for stream-stream inner join. - Support state retrain time configuration for stream-stream inner join. - Add key extractor for `DataStreamJoin` in `UniqueKeyExtractor`. - Modify UniqueKeyExtractor to support cases like `select(pk as pk1, pk as pk2)`, in thoes cases, pk1 or pk2 can either be the unique key. ## Verifying this change This change added tests and can be verified as follows: - Add JoinValidation Tests - Add JoinHarnessTest to test data expiration - Add JoinITCase for SQL and table-api ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs, I have checked the sql and table-api docs and find the inner join has already been added) You can merge this pull request into a Git repository by running: $ git pull https://github.com/hequn8128/flink 6094_pr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4471.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4471 commit 459c3b5b7801a15eac907ab5c11f8ca01acdc1c6 Author: 军长 Date: 2017-07-30T10:45:45Z [FLINK-6094] [table] Implement stream-stream proctime non-window inner join --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112878#comment-16112878 ] ASF GitHub Bot commented on FLINK-6493: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4328 Thanks! Alright, this looks good now, merging (will address my final own comment while merging)! > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112877#comment-16112877 ] ASF GitHub Bot commented on FLINK-6493: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r131167537 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +176,17 @@ public boolean equals(Object obj) { return false; } + if (!(obj instanceof Snapshot)) { + return false; + } + + Snapshot snapshot = (Snapshot)obj; + // need to check for nulls because serializer and config snapshots may be null on restore - return (obj instanceof Snapshot) - && name.equals(((Snapshot) obj).getName()) - && assignmentMode.equals(((Snapshot) obj).getAssignmentMode()) - && ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null) - || partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer())) - && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null) - || partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot())); + return name.equals(snapshot.getName()) + && assignmentMode.equals(snapshot.getAssignmentMode()) + && (Objects.equals(partitionStateSerializer, snapshot.getPartitionStateSerializer())) --- End diff -- These extra most-outer wrapping parenthesis are redundant. I'll remove them when merging. > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4328: [FLINK-6493] Fix ineffective null check in RegisteredOper...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4328 Thanks! Alright, this looks good now, merging (will address my final own comment while merging)! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4328: [FLINK-6493] Fix ineffective null check in Registe...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r131167537 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +176,17 @@ public boolean equals(Object obj) { return false; } + if (!(obj instanceof Snapshot)) { + return false; + } + + Snapshot snapshot = (Snapshot)obj; + // need to check for nulls because serializer and config snapshots may be null on restore - return (obj instanceof Snapshot) - && name.equals(((Snapshot) obj).getName()) - && assignmentMode.equals(((Snapshot) obj).getAssignmentMode()) - && ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null) - || partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer())) - && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null) - || partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot())); + return name.equals(snapshot.getName()) + && assignmentMode.equals(snapshot.getAssignmentMode()) + && (Objects.equals(partitionStateSerializer, snapshot.getPartitionStateSerializer())) --- End diff -- These extra most-outer wrapping parenthesis are redundant. I'll remove them when merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7309) NullPointerException in CodeGenUtils.timePointToInternalCode() generated code
[ https://issues.apache.org/jira/browse/FLINK-7309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112834#comment-16112834 ] Liangliang Chen commented on FLINK-7309: hi, [~twalthr], I'm not very familiar with Scala-Lang, so I rewrite a test example with Java as below: {code} public class TestNullSQL { public static void main(String[] args) throws Exception { // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env); TypeInformation[] types = {BasicTypeInfo.INT_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP}; String names[] = {"id", "ts"}; RowTypeInfo typeInfo = new RowTypeInfo(types, names); // we assign a null value here!! DataStream input = env.fromElements(Row.of(1001, null)).returns(typeInfo); tEnv.registerDataStream("test_table", input); Table table = tEnv.sql("SELECT id, ts FROM test_table"); DataStream result = tEnv.toAppendStream(table, Row.class); result.print(); env.execute(); } } {code} I use a row type in this example and the exception will still happens.The Row data type supports an arbitrary number of fields and fields with {quote}null{quote} values, so I think the generated code has some problems. And what do you think about? > NullPointerException in CodeGenUtils.timePointToInternalCode() generated code > - > > Key: FLINK-7309 > URL: https://issues.apache.org/jira/browse/FLINK-7309 > Project: Flink > Issue Type: Bug > Components: Local Runtime, Table API & SQL >Affects Versions: 1.3.1 >Reporter: Liangliang Chen >Priority: Critical > > The code generated by CodeGenUtils.timePointToInternalCode() will cause a > NullPointerException when SQL table field type is `TIMESTAMP` and the field > value is `null`. > Example for reproduce: > {code} > object StreamSQLExample { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > // null field value > val orderA: DataStream[Order] = env.fromCollection(Seq( > Order(null, "beer", 3))) > > tEnv.registerDataStream("OrderA", orderA, 'ts, 'product, 'amount) > val result = tEnv.sql("SELECT * FROM OrderA") > result.toAppendStream[Order].print() > > env.execute() > } > case class Order(ts: Timestamp, product: String, amount: Int) > } > {code} > In the above example, timePointToInternalCode() will generated some > statements like this: > {code} > ... > long result$1 = > org.apache.calcite.runtime.SqlFunctions.toLong((java.sql.Timestamp) in1.ts()); > boolean isNull$2 = (java.sql.Timestamp) in1.ts() == null; > ... > {code} > so, the NPE will happen when in1.ts() is null. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4356: [FLINK-5486] Fix lacking of synchronization in BucketingS...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4356 ``` Tests run: 6, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 91.985 sec <<< FAILURE! - in org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase testExternalizedFullRocksDBCheckpointsStandalone(org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase) Time elapsed: 29.976 sec <<< ERROR! java.io.IOException: java.lang.Exception: Failed to complete checkpoint at org.apache.flink.runtime.testingUtils.TestingCluster.requestCheckpoint(TestingCluster.scala:399) at org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.testExternalizedCheckpoints(ExternalizedCheckpointITCase.java:218) at org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.testExternalizedFullRocksDBCheckpointsStandalone(ExternalizedCheckpointITCase.java:78) Caused by: java.lang.Exception: Failed to complete checkpoint at org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1$$anon$2.apply(TestingJobManagerLike.scala:375) at org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1$$anon$2.apply(TestingJobManagerLike.scala:358) at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) at akka.dispatch.OnComplete.internal(Future.scala:247) at akka.dispatch.OnComplete.internal(Future.scala:245) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112770#comment-16112770 ] ASF GitHub Bot commented on FLINK-5486: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4356 ``` Tests run: 6, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 91.985 sec <<< FAILURE! - in org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase testExternalizedFullRocksDBCheckpointsStandalone(org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase) Time elapsed: 29.976 sec <<< ERROR! java.io.IOException: java.lang.Exception: Failed to complete checkpoint at org.apache.flink.runtime.testingUtils.TestingCluster.requestCheckpoint(TestingCluster.scala:399) at org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.testExternalizedCheckpoints(ExternalizedCheckpointITCase.java:218) at org.apache.flink.test.checkpointing.ExternalizedCheckpointITCase.testExternalizedFullRocksDBCheckpointsStandalone(ExternalizedCheckpointITCase.java:78) Caused by: java.lang.Exception: Failed to complete checkpoint at org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1$$anon$2.apply(TestingJobManagerLike.scala:375) at org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1$$anon$2.apply(TestingJobManagerLike.scala:358) at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) at akka.dispatch.OnComplete.internal(Future.scala:247) at akka.dispatch.OnComplete.internal(Future.scala:245) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ``` > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > Fix For: 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7293) Support custom order by in PatternStream
[ https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112705#comment-16112705 ] ASF GitHub Bot commented on FLINK-7293: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4418 +1 from my side. Let's just wait a bit more to see if @kl0u wants to add some comments, as he commented on the JIRA. As a side note for the future, please fill in the new contribution checklist as it helps reviewing. > Support custom order by in PatternStream > > > Key: FLINK-7293 > URL: https://issues.apache.org/jira/browse/FLINK-7293 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, when {{ProcessingTime}} is configured, the events are fed to NFA > in the order of the arriving time and when {{EventTime}} is configured, the > events are fed to NFA in the order of the event time. It should also allow > custom {{order by}} to allow users to define the order of the events besides > the above factors. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4418: [FLINK-7293] [cep] Support custom order by in PatternStre...
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4418 +1 from my side. Let's just wait a bit more to see if @kl0u wants to add some comments, as he commented on the JIRA. As a side note for the future, please fill in the new contribution checklist as it helps reviewing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5720) Deprecate "Folding" in all of DataStream API
[ https://issues.apache.org/jira/browse/FLINK-5720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112702#comment-16112702 ] ASF GitHub Bot commented on FLINK-5720: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3816 Actually, `KeyedStream.aggregate` is protected and thus not accessible. You may, however, access `WindowedStream#aggregate(AggregateFunction) ` and `AllWindowedStream#aggregate(AggregateFunction) ` for windowed streams and use a stateful map or flatmap for the rest. > Deprecate "Folding" in all of DataStream API > > > Key: FLINK-5720 > URL: https://issues.apache.org/jira/browse/FLINK-5720 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.3.0, 1.4.0 > > > Folding is an operation that cannot be done incrementally in a distributed > way and that also cannot be done on merging windows. Now that we have > {{AggregatingState}} and aggregate operations we should deprecate folding in > the APIs and deprecate {{FoldingState}}. > I suggest to remove folding completely in Flink 2.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3816: [FLINK-5720] Deprecate DataStream#fold()
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3816 Actually, `KeyedStream.aggregate` is protected and thus not accessible. You may, however, access `WindowedStream#aggregate(AggregateFunction) ` and `AllWindowedStream#aggregate(AggregateFunction) ` for windowed streams and use a stateful map or flatmap for the rest. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7147) Support greedy quantifier in CEP
[ https://issues.apache.org/jira/browse/FLINK-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112698#comment-16112698 ] ASF GitHub Bot commented on FLINK-7147: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4296 I think we are getting close to the final version. Good job! Still though have two higher level comments: - is current times().greedy() behaviour intended? For pattern `a{2, 5} b` and sequence `a1 a2 a3 a4 b` shouldn't it return just one match: `a1 a2 a3 a4 b` Instead of three as in `testGreedyTimesRange` - this feature should be documented in the docs. It should be also very clearly stated there that it does not work for `GroupPatterns` > Support greedy quantifier in CEP > > > Key: FLINK-7147 > URL: https://issues.apache.org/jira/browse/FLINK-7147 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > > Greedy quantifier will try to match the token as many times as possible. For > example, for pattern {{a b* c}} (skip till next is used) and inputs {{a b1 b2 > c}}, if the quantifier for {{b}} is greedy, it will only output {{a b1 b2 c}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4296 I think we are getting close to the final version. Good job! Still though have two higher level comments: - is current times().greedy() behaviour intended? For pattern `a{2, 5} b` and sequence `a1 a2 a3 a4 b` shouldn't it return just one match: `a1 a2 a3 a4 b` Instead of three as in `testGreedyTimesRange` - this feature should be documented in the docs. It should be also very clearly stated there that it does not work for `GroupPatterns` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4349: [FLINK-7127] [runtime] Remove unnecessary null check or a...
Github user mylog00 commented on the issue: https://github.com/apache/flink/pull/4349 Thanks for the advice. I will not repeat this mistake again in the future :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7127) Remove unnecessary null check or add null check
[ https://issues.apache.org/jira/browse/FLINK-7127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112659#comment-16112659 ] ASF GitHub Bot commented on FLINK-7127: --- Github user mylog00 commented on the issue: https://github.com/apache/flink/pull/4349 Thanks for the advice. I will not repeat this mistake again in the future :) > Remove unnecessary null check or add null check > --- > > Key: FLINK-7127 > URL: https://issues.apache.org/jira/browse/FLINK-7127 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Priority: Trivial > Labels: starter > > In {{HeapKeyedStateBackend#snapshot}} we have: > {code} > for (Map.Entry> kvState : stateTables.entrySet()) > { > // 1) Here we don't check for null > metaInfoSnapshots.add(kvState.getValue().getMetaInfo().snapshot()); > kVStateToId.put(kvState.getKey(), kVStateToId.size()); > // 2) Here we check for null > StateTable stateTable = kvState.getValue(); > if (null != stateTable) { > cowStateStableSnapshots.put(stateTable, > stateTable.createSnapshot()); > } > } > {code} > Either this can lead to a NPE and we should check it in 1) or we remove the > null check in 2). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112654#comment-16112654 ] ASF GitHub Bot commented on FLINK-7343: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4470 [FLINK-7343] Simulate network failures in kafka at-least-once test We shouldn't fail KafkaServers directly, because they might not be able to flush the data (`log.flush.interval.***` properties). Since we don't want to test how well Kafka implements at-least-once/exactly-once semantic, it is a better idea (and hopefully more reliable) to just simulate network failure between Flink and Kafka in our at-least-once tests. To achieve that I have implemented `NetworkFailuresProxy` class. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink network-failures Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4470.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4470 commit 0e28327619893cfbf793fa842be3d965f649516c Author: Piotr Nowojski Date: 2017-08-01T14:05:49Z [FLINK-7343][kafka] Increase Xmx for tests Sometimes 1000m was not enough memory to run at-least-once tests with broker failures on Travis commit 8d820c3d0e77624a945e074f4a1bc476b5fd0f75 Author: Piotr Nowojski Date: 2017-08-01T16:11:27Z [FLINK-7343] Add network proxy utility to simulate network failures commit 967e1dfc87846b4011652bbaefab696900abc8dd Author: Piotr Nowojski Date: 2017-08-03T07:25:04Z fixup! [FLINK-7343][kafka] Increase Xmx for tests commit 27b20f2ec3770231d95c3c7918c9313ce58b5e18 Author: Piotr Nowojski Date: 2017-08-03T09:27:12Z [FLINK-7343] Use NetworkFailureProxy in kafka tests We shouldn't fail KafkaServers directly, because they might not be able to flush the data. Since we don't want to test how well Kafka implements at-least-once/exactly-once semantic, we just simulate network failure between Flink and Kafka in our at-least-once tests. commit 692b5944f16b98aafe716ca1d18a04fa8a033798 Author: Piotr Nowojski Date: 2017-08-03T09:35:26Z [hotfix][Kafka] Clean up getKafkaServer method > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4470: [FLINK-7343] Simulate network failures in kafka at...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4470 [FLINK-7343] Simulate network failures in kafka at-least-once test We shouldn't fail KafkaServers directly, because they might not be able to flush the data (`log.flush.interval.***` properties). Since we don't want to test how well Kafka implements at-least-once/exactly-once semantic, it is a better idea (and hopefully more reliable) to just simulate network failure between Flink and Kafka in our at-least-once tests. To achieve that I have implemented `NetworkFailuresProxy` class. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink network-failures Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4470.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4470 commit 0e28327619893cfbf793fa842be3d965f649516c Author: Piotr Nowojski Date: 2017-08-01T14:05:49Z [FLINK-7343][kafka] Increase Xmx for tests Sometimes 1000m was not enough memory to run at-least-once tests with broker failures on Travis commit 8d820c3d0e77624a945e074f4a1bc476b5fd0f75 Author: Piotr Nowojski Date: 2017-08-01T16:11:27Z [FLINK-7343] Add network proxy utility to simulate network failures commit 967e1dfc87846b4011652bbaefab696900abc8dd Author: Piotr Nowojski Date: 2017-08-03T07:25:04Z fixup! [FLINK-7343][kafka] Increase Xmx for tests commit 27b20f2ec3770231d95c3c7918c9313ce58b5e18 Author: Piotr Nowojski Date: 2017-08-03T09:27:12Z [FLINK-7343] Use NetworkFailureProxy in kafka tests We shouldn't fail KafkaServers directly, because they might not be able to flush the data. Since we don't want to test how well Kafka implements at-least-once/exactly-once semantic, we just simulate network failure between Flink and Kafka in our at-least-once tests. commit 692b5944f16b98aafe716ca1d18a04fa8a033798 Author: Piotr Nowojski Date: 2017-08-03T09:35:26Z [hotfix][Kafka] Clean up getKafkaServer method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112653#comment-16112653 ] ASF GitHub Bot commented on FLINK-6493: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4328 Hello, @tzulitai Code have been updated. Please check it out. :) > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4328: [FLINK-6493] Fix ineffective null check in RegisteredOper...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4328 Hello, @tzulitai Code have been updated. Please check it out. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112647#comment-16112647 ] ASF GitHub Bot commented on FLINK-6493: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r131127614 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +176,19 @@ public boolean equals(Object obj) { return false; } + if (!(obj instanceof Snapshot)) { + return false; + } + + Snapshot snapshot = (Snapshot)obj; + // need to check for nulls because serializer and config snapshots may be null on restore - return (obj instanceof Snapshot) - && name.equals(((Snapshot) obj).getName()) - && assignmentMode.equals(((Snapshot) obj).getAssignmentMode()) - && ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null) - || partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer())) - && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null) - || partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot())); + return Objects.equals(name, snapshot.getName()) + && Objects.equals(assignmentMode, snapshot.getAssignmentMode()) + && ((partitionStateSerializer == null && (snapshot.getPartitionStateSerializer() == null) + || (Objects.equals(partitionStateSerializer, snapshot.getPartitionStateSerializer( --- End diff -- Thanks you so much! Have been addressed. > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4328: [FLINK-6493] Fix ineffective null check in Registe...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r131127614 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +176,19 @@ public boolean equals(Object obj) { return false; } + if (!(obj instanceof Snapshot)) { + return false; + } + + Snapshot snapshot = (Snapshot)obj; + // need to check for nulls because serializer and config snapshots may be null on restore - return (obj instanceof Snapshot) - && name.equals(((Snapshot) obj).getName()) - && assignmentMode.equals(((Snapshot) obj).getAssignmentMode()) - && ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null) - || partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer())) - && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null) - || partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot())); + return Objects.equals(name, snapshot.getName()) + && Objects.equals(assignmentMode, snapshot.getAssignmentMode()) + && ((partitionStateSerializer == null && (snapshot.getPartitionStateSerializer() == null) + || (Objects.equals(partitionStateSerializer, snapshot.getPartitionStateSerializer( --- End diff -- Thanks you so much! Have been addressed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112636#comment-16112636 ] ASF GitHub Bot commented on FLINK-6493: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r131125429 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +176,19 @@ public boolean equals(Object obj) { return false; } + if (!(obj instanceof Snapshot)) { + return false; + } + + Snapshot snapshot = (Snapshot)obj; + // need to check for nulls because serializer and config snapshots may be null on restore - return (obj instanceof Snapshot) - && name.equals(((Snapshot) obj).getName()) - && assignmentMode.equals(((Snapshot) obj).getAssignmentMode()) - && ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null) - || partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer())) - && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null) - || partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot())); + return Objects.equals(name, snapshot.getName()) + && Objects.equals(assignmentMode, snapshot.getAssignmentMode()) --- End diff -- Yes. You are right. I did this because I want make them have the same way, Objects.equal. Now, I will return to the previously method. Thanks ~ > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4328: [FLINK-6493] Fix ineffective null check in Registe...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r131125429 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +176,19 @@ public boolean equals(Object obj) { return false; } + if (!(obj instanceof Snapshot)) { + return false; + } + + Snapshot snapshot = (Snapshot)obj; + // need to check for nulls because serializer and config snapshots may be null on restore - return (obj instanceof Snapshot) - && name.equals(((Snapshot) obj).getName()) - && assignmentMode.equals(((Snapshot) obj).getAssignmentMode()) - && ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null) - || partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer())) - && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null) - || partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot())); + return Objects.equals(name, snapshot.getName()) + && Objects.equals(assignmentMode, snapshot.getAssignmentMode()) --- End diff -- Yes. You are right. I did this because I want make them have the same way, Objects.equal. Now, I will return to the previously method. Thanks ~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7076) Implement container release to support dynamic scaling
[ https://issues.apache.org/jira/browse/FLINK-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112624#comment-16112624 ] Till Rohrmann commented on FLINK-7076: -- Hi [~yuemeng], let's first tackle the second problem which is how to release a container on the RM side. This basically means how to give it back to Yarn. I think the {{YarnResourceManager#stopWorker}} method should do this. > Implement container release to support dynamic scaling > -- > > Key: FLINK-7076 > URL: https://issues.apache.org/jira/browse/FLINK-7076 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Reporter: Till Rohrmann >Assignee: yuemeng > Labels: flip-6 > > In order to support dynamic scaling, the {{YarnResourceManager}} has to be > able to dynamically free containers. We have to implement the > {{YarnResourceManager#stopWorker}} method. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7335) Remove Flink's own future implementation
[ https://issues.apache.org/jira/browse/FLINK-7335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112618#comment-16112618 ] ASF GitHub Bot commented on FLINK-7335: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4463 > Remove Flink's own future implementation > > > Key: FLINK-7335 > URL: https://issues.apache.org/jira/browse/FLINK-7335 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > > Delete Flink's own future implementation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7252) Remove Flink Futures or back them by CompletableFutures
[ https://issues.apache.org/jira/browse/FLINK-7252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7252. -- Resolution: Fixed Flink's own future implementation has been removed and its usage has been replaced by Java 8's {{CompletableFuture}}. > Remove Flink Futures or back them by CompletableFutures > --- > > Key: FLINK-7252 > URL: https://issues.apache.org/jira/browse/FLINK-7252 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Till Rohrmann > Fix For: 1.4.0 > > > Issue to track the replacement of Flink's own futures by Java 8's > {{CompletableFuture}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7335) Remove Flink's own future implementation
[ https://issues.apache.org/jira/browse/FLINK-7335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7335. Resolution: Done Removed via 1fabe0cd04306fd5bd607729bf9ad9f22cc25ed2 > Remove Flink's own future implementation > > > Key: FLINK-7335 > URL: https://issues.apache.org/jira/browse/FLINK-7335 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > > Delete Flink's own future implementation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4463: [FLINK-7335] [futures] Remove Flink's own Future i...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4463 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7335) Remove Flink's own future implementation
[ https://issues.apache.org/jira/browse/FLINK-7335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112616#comment-16112616 ] ASF GitHub Bot commented on FLINK-7335: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4463#discussion_r131120916 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager import java.io.IOException import java.net._ import java.util.UUID -import java.util.concurrent.{Future => JavaFuture, _} +import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _} --- End diff -- Exclude `Future` from being imported. > Remove Flink's own future implementation > > > Key: FLINK-7335 > URL: https://issues.apache.org/jira/browse/FLINK-7335 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > > Delete Flink's own future implementation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4463: [FLINK-7335] [futures] Remove Flink's own Future i...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4463#discussion_r131120916 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager import java.io.IOException import java.net._ import java.util.UUID -import java.util.concurrent.{Future => JavaFuture, _} +import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _} --- End diff -- Exclude `Future` from being imported. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112611#comment-16112611 ] ASF GitHub Bot commented on FLINK-7334: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4462 > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7356) misleading s3 file uri in configuration file
[ https://issues.apache.org/jira/browse/FLINK-7356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7356: Fix Version/s: (was: 1.3.2) 1.3.3 > misleading s3 file uri in configuration file > > > Key: FLINK-7356 > URL: https://issues.apache.org/jira/browse/FLINK-7356 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.3.1 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > in > https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml, > the comment in line 121 should say {{"*s3*://" for S3 file system}} rather > than {{"S3://" for S3 file system}}, because {{S3://xxx}} is not recognized > by AWS SDK. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4462: [FLINK-7334] [futures] Replace Flink's futures wit...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4462 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7334. Resolution: Done Done via eddafc1ac9e4d787df44b63809f0d6dfd1f3def7 > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7265) FileSystems should describe their kind and consistency level
[ https://issues.apache.org/jira/browse/FLINK-7265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7265: Fix Version/s: (was: 1.3.2) 1.3.3 > FileSystems should describe their kind and consistency level > > > Key: FLINK-7265 > URL: https://issues.apache.org/jira/browse/FLINK-7265 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0, 1.3.3 > > > Currently, all {{FileSystem}} types look the same to Flink. > However, certain operations should only be executed on certain kinds of file > systems. > For example, it makes no sense to attempt to delete an empty parent directory > on S3, because there are no such thinks as directories, only hierarchical > naming in the keys (file names). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7100) TaskManager metrics are registered twice
[ https://issues.apache.org/jira/browse/FLINK-7100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7100: Fix Version/s: (was: 1.3.2) 1.3.3 > TaskManager metrics are registered twice > > > Key: FLINK-7100 > URL: https://issues.apache.org/jira/browse/FLINK-7100 > Project: Flink > Issue Type: Bug > Components: Local Runtime, Metrics >Affects Versions: 1.3.0, 1.4.0 >Reporter: Chesnay Schepler >Assignee: Fang Yong > Labels: flip-6 > Fix For: 1.4.0, 1.3.3 > > > TaskManager metrics are currently registered twice, once when the TaskManager > is started and once when the TaskManager associates with a JobManager. > Originally the metrics were registered when the TM associates with the JM and > unregistered upon disassociation. > 9e9776f17ed18b12af177e31ab0bc266236f85ef modified the {{TaskManager}} to use > the {{TaskManagerServices}}, which when loaded _also_ register the metrics. > I suggest to remove the registrations that happen upon (dis-)association. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()
[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-5486: Fix Version/s: (was: 1.3.2) > Lack of synchronization in BucketingSink#handleRestoredBucketState() > > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: mingleizhang > Fix For: 1.3.3 > > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6900) Limit size of indiivual components in DropwizardReporter
[ https://issues.apache.org/jira/browse/FLINK-6900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-6900: Fix Version/s: (was: 1.3.2) 1.3.3 > Limit size of indiivual components in DropwizardReporter > > > Key: FLINK-6900 > URL: https://issues.apache.org/jira/browse/FLINK-6900 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.0, 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0, 1.3.3 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7266) Don't attempt to delete parent directory on S3
[ https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112606#comment-16112606 ] Aljoscha Krettek commented on FLINK-7266: - This is actually resolved on {{release-1.3}} for the s3 filesystem. > Don't attempt to delete parent directory on S3 > -- > > Key: FLINK-7266 > URL: https://issues.apache.org/jira/browse/FLINK-7266 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Critical > Fix For: 1.4.0, 1.3.2 > > > Currently, every attempted release of an S3 state object also checks if the > "parent directory" is empty and then tries to delete it. > Not only is that unnecessary on S3, but it is prohibitively expensive and for > example causes S3 to throttle calls by the JobManager on checkpoint cleanup. > The {{FileState}} must only attempt parent directory cleanup when operating > against real file systems, not when operating against object stores. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7335) Remove Flink's own future implementation
[ https://issues.apache.org/jira/browse/FLINK-7335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112605#comment-16112605 ] ASF GitHub Bot commented on FLINK-7335: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4463 Thanks for the review @zentol. Merging this PR after merging #4462. > Remove Flink's own future implementation > > > Key: FLINK-7335 > URL: https://issues.apache.org/jira/browse/FLINK-7335 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > > Delete Flink's own future implementation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7334) Replace Flink's futures by CompletableFuture in RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112604#comment-16112604 ] ASF GitHub Bot commented on FLINK-7334: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4462 Thanks for reviewing @zentol. I'll address your comments and then merge this PR. > Replace Flink's futures by CompletableFuture in RpcGateway > -- > > Key: FLINK-7334 > URL: https://issues.apache.org/jira/browse/FLINK-7334 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4463: [FLINK-7335] [futures] Remove Flink's own Future implemen...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4463 Thanks for the review @zentol. Merging this PR after merging #4462. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4462: [FLINK-7334] [futures] Replace Flink's futures with Java ...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4462 Thanks for reviewing @zentol. I'll address your comments and then merge this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7363) add hashes and signatures to the download page
Nico Kruber created FLINK-7363: -- Summary: add hashes and signatures to the download page Key: FLINK-7363 URL: https://issues.apache.org/jira/browse/FLINK-7363 Project: Flink Issue Type: Improvement Components: Project Website Reporter: Nico Kruber Assignee: Nico Kruber As part of the releases, we also generate MD5 hashes and cryptographic signatures but neither link to those nor do we explain which keys are valid release-signing keys. This should be added. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7361) flink-web doesn't build with ruby 2.4
[ https://issues.apache.org/jira/browse/FLINK-7361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112572#comment-16112572 ] Nico Kruber edited comment on FLINK-7361 at 8/3/17 11:08 AM: - The linked PR fixes the build issues. was (Author: nicok): This fixes the build issues. > flink-web doesn't build with ruby 2.4 > - > > Key: FLINK-7361 > URL: https://issues.apache.org/jira/browse/FLINK-7361 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Nico Kruber >Assignee: Nico Kruber > > The dependencies pulled in by the old jekyll version do not build with ruby > 2.4 and fail with something like > {code} > yajl_ext.c:881:22: error: 'rb_cFixnum' undeclared (first use in this > function); did you mean 'rb_isalnum'? > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4328: [FLINK-6493] Fix ineffective null check in Registe...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r131107598 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +176,19 @@ public boolean equals(Object obj) { return false; } + if (!(obj instanceof Snapshot)) { + return false; + } + + Snapshot snapshot = (Snapshot)obj; + // need to check for nulls because serializer and config snapshots may be null on restore - return (obj instanceof Snapshot) - && name.equals(((Snapshot) obj).getName()) - && assignmentMode.equals(((Snapshot) obj).getAssignmentMode()) - && ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null) - || partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer())) - && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null) - || partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot())); + return Objects.equals(name, snapshot.getName()) + && Objects.equals(assignmentMode, snapshot.getAssignmentMode()) + && ((partitionStateSerializer == null && (snapshot.getPartitionStateSerializer() == null) + || (Objects.equals(partitionStateSerializer, snapshot.getPartitionStateSerializer( + && ((partitionStateSerializerConfigSnapshot == null && (snapshot.getPartitionStateSerializerConfigSnapshot() == null) + || (Objects.equals(partitionStateSerializerConfigSnapshot, snapshot.getPartitionStateSerializerConfigSnapshot()); --- End diff -- Same for these two. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112555#comment-16112555 ] ASF GitHub Bot commented on FLINK-6493: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r131107598 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +176,19 @@ public boolean equals(Object obj) { return false; } + if (!(obj instanceof Snapshot)) { + return false; + } + + Snapshot snapshot = (Snapshot)obj; + // need to check for nulls because serializer and config snapshots may be null on restore - return (obj instanceof Snapshot) - && name.equals(((Snapshot) obj).getName()) - && assignmentMode.equals(((Snapshot) obj).getAssignmentMode()) - && ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null) - || partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer())) - && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null) - || partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot())); + return Objects.equals(name, snapshot.getName()) + && Objects.equals(assignmentMode, snapshot.getAssignmentMode()) + && ((partitionStateSerializer == null && (snapshot.getPartitionStateSerializer() == null) + || (Objects.equals(partitionStateSerializer, snapshot.getPartitionStateSerializer( + && ((partitionStateSerializerConfigSnapshot == null && (snapshot.getPartitionStateSerializerConfigSnapshot() == null) + || (Objects.equals(partitionStateSerializerConfigSnapshot, snapshot.getPartitionStateSerializerConfigSnapshot()); --- End diff -- Same for these two. > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112553#comment-16112553 ] ASF GitHub Bot commented on FLINK-6493: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r131107573 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +176,19 @@ public boolean equals(Object obj) { return false; } + if (!(obj instanceof Snapshot)) { + return false; + } + + Snapshot snapshot = (Snapshot)obj; + // need to check for nulls because serializer and config snapshots may be null on restore - return (obj instanceof Snapshot) - && name.equals(((Snapshot) obj).getName()) - && assignmentMode.equals(((Snapshot) obj).getAssignmentMode()) - && ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null) - || partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer())) - && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null) - || partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot())); + return Objects.equals(name, snapshot.getName()) + && Objects.equals(assignmentMode, snapshot.getAssignmentMode()) + && ((partitionStateSerializer == null && (snapshot.getPartitionStateSerializer() == null) + || (Objects.equals(partitionStateSerializer, snapshot.getPartitionStateSerializer( --- End diff -- ``` && ((partitionStateSerializer == null && (snapshot.getPartitionStateSerializer() == null) || (Objects.equals(partitionStateSerializer, snapshot.getPartitionStateSerializer( ``` These two lines can just be `&& Objects.equals(partitionStateSerializer, snapshot.getPartitionStateSerializer())`, because the `Objects.equals` method returns `true` if both arguments are `null`. > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4328: [FLINK-6493] Fix ineffective null check in Registe...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r131107573 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +176,19 @@ public boolean equals(Object obj) { return false; } + if (!(obj instanceof Snapshot)) { + return false; + } + + Snapshot snapshot = (Snapshot)obj; + // need to check for nulls because serializer and config snapshots may be null on restore - return (obj instanceof Snapshot) - && name.equals(((Snapshot) obj).getName()) - && assignmentMode.equals(((Snapshot) obj).getAssignmentMode()) - && ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null) - || partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer())) - && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null) - || partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot())); + return Objects.equals(name, snapshot.getName()) + && Objects.equals(assignmentMode, snapshot.getAssignmentMode()) + && ((partitionStateSerializer == null && (snapshot.getPartitionStateSerializer() == null) + || (Objects.equals(partitionStateSerializer, snapshot.getPartitionStateSerializer( --- End diff -- ``` && ((partitionStateSerializer == null && (snapshot.getPartitionStateSerializer() == null) || (Objects.equals(partitionStateSerializer, snapshot.getPartitionStateSerializer( ``` These two lines can just be `&& Objects.equals(partitionStateSerializer, snapshot.getPartitionStateSerializer())`, because the `Objects.equals` method returns `true` if both arguments are `null`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6493) Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals()
[ https://issues.apache.org/jira/browse/FLINK-6493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112551#comment-16112551 ] ASF GitHub Bot commented on FLINK-6493: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4328#discussion_r131107143 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java --- @@ -175,14 +176,19 @@ public boolean equals(Object obj) { return false; } + if (!(obj instanceof Snapshot)) { + return false; + } + + Snapshot snapshot = (Snapshot)obj; + // need to check for nulls because serializer and config snapshots may be null on restore - return (obj instanceof Snapshot) - && name.equals(((Snapshot) obj).getName()) - && assignmentMode.equals(((Snapshot) obj).getAssignmentMode()) - && ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null) - || partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer())) - && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null) - || partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot())); + return Objects.equals(name, snapshot.getName()) + && Objects.equals(assignmentMode, snapshot.getAssignmentMode()) --- End diff -- We don't need `null` checks for the assignment mode and name. Those are already guaranteed to be non-null in the constructor. > Ineffective null check in RegisteredOperatorBackendStateMetaInfo#equals() > - > > Key: FLINK-6493 > URL: https://issues.apache.org/jira/browse/FLINK-6493 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Minor > Fix For: 1.4.0 > > > {code} > && ((partitionStateSerializer == null && ((Snapshot) > obj).getPartitionStateSerializer() == null) > || partitionStateSerializer.equals(((Snapshot) > obj).getPartitionStateSerializer())) > && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot() == null) > || partitionStateSerializerConfigSnapshot.equals(((Snapshot) > obj).getPartitionStateSerializerConfigSnapshot())); > {code} > The null check for partitionStateSerializer / > partitionStateSerializerConfigSnapshot is in combination with another clause. > This may lead to NPE in the partitionStateSerializer.equals() call. -- This message was sent by Atlassian JIRA (v6.4.14#64029)