[GitHub] flink issue #3736: [Flink-6013][metrics] Add Datadog HTTP metrics reporter
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/3736 @zentol ready for another round. The build fails because there are a few unrelated flake tests timing out in travis build --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKinesisConsu...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/3738 @tzulitai You are very welcome . It is my pleasure ~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976114#comment-15976114 ] ASF GitHub Bot commented on FLINK-6311: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/3738 @tzulitai You are very welcome . It is my pleasure ~ > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976110#comment-15976110 ] ASF GitHub Bot commented on FLINK-6311: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3738 Thanks @zhangminglei. LGTM! Merging to {{master}} and {{release-1.2}} (will merge a bit later today ;) ) .. > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKinesisConsu...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3738 Thanks @zhangminglei. LGTM! Merging to {{master}} and {{release-1.2}} (will merge a bit later today ;) ) .. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKinesisConsu...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/3738 @tzulitai Hi, I have updated the code. Please check it out. Thanks and appreciate it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976106#comment-15976106 ] ASF GitHub Bot commented on FLINK-6311: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/3738 @tzulitai Hi, I have updated the code. Please check it out. Thanks and appreciate it. > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6264) Kafka consumer fails if can't find leader for partition
[ https://issues.apache.org/jira/browse/FLINK-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976105#comment-15976105 ] Tzu-Li (Gordon) Tai commented on FLINK-6264: [~autoaim800] I think the exception occurs when there are some partitions whose leader cannot be found, so it isn't the case that you described. I think what [~gyfora] is suggesting is that those partitions should be retried instead of failing? Gyula, could you clarify? > Kafka consumer fails if can't find leader for partition > --- > > Key: FLINK-6264 > URL: https://issues.apache.org/jira/browse/FLINK-6264 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Gyula Fora > > We have observed the following error many times when brokers failed/were > restarted: > java.lang.RuntimeException: Unable to find a leader for partitions: > [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, > KafkaPartitionHandle=[mytopic,10], offset=-1] > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6303) Documentation support build in docker on OSX
[ https://issues.apache.org/jira/browse/FLINK-6303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976031#comment-15976031 ] ASF GitHub Bot commented on FLINK-6303: --- Github user mtunique commented on the issue: https://github.com/apache/flink/pull/3719 I will follow the branch. > Documentation support build in docker on OSX > > > Key: FLINK-6303 > URL: https://issues.apache.org/jira/browse/FLINK-6303 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Tao Meng >Assignee: Tao Meng >Priority: Trivial > > Now docker only support linux. Because {{-v > "/home/$\{USER_NAME}:/home/$\{USER_NAME}"}} only support linux. We need > change {{/home/$\{USER_NAME\}}} to {{$\{HOME\}}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3719: [FLINK-6303] Documentation support build in docker on OSX
Github user mtunique commented on the issue: https://github.com/apache/flink/pull/3719 I will follow the branch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6264) Kafka consumer fails if can't find leader for partition
[ https://issues.apache.org/jira/browse/FLINK-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976012#comment-15976012 ] mingleizhang commented on FLINK-6264: - Hi, I would not think it is a bug when brokers failer and restarted. That is because {code}unassignedPartitions.size() > 0{code} and then you got this exception. [~tzulitai] Hi, How about your opinion about this issue ? > Kafka consumer fails if can't find leader for partition > --- > > Key: FLINK-6264 > URL: https://issues.apache.org/jira/browse/FLINK-6264 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Gyula Fora > > We have observed the following error many times when brokers failed/were > restarted: > java.lang.RuntimeException: Unable to find a leader for partitions: > [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, > KafkaPartitionHandle=[mytopic,10], offset=-1] > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6333) Utilize Bloomfilters in RocksDb
[ https://issues.apache.org/jira/browse/FLINK-6333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976008#comment-15976008 ] Fang Yong commented on FLINK-6333: -- Now flink uses RocksDb jni 4.11.2, and already support setFilter(new BloomFilter()) method in BlockBasedTableConfig. Can bloomfilters be used in rocksdb 4.11.2 directly? Are there any other improvements from current version to RocksDb 5.2.1+? > Utilize Bloomfilters in RocksDb > --- > > Key: FLINK-6333 > URL: https://issues.apache.org/jira/browse/FLINK-6333 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu > > Bloom Filters would speed up RocksDb lookups. > When we upgrade to RocksDb 5.2.1+, we would be able to do: > {code} > new BlockBasedTableConfig() > .setBlockCacheSize(blockCacheSize) > .setBlockSize(blockSize) > .setFilter(new BloomFilter()) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975987#comment-15975987 ] ASF GitHub Bot commented on FLINK-6228: --- GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3743 [FLINK-6228][table] Integrating the OVER windows in the Table API (st… In this PR I had integrating the OVER windows in the Table API, Implementation of the syntax and use examples are as follows: * Syntax: ``` table .overWindows( (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy order_by_expression] (preceding UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) [following UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] as alias,...[n]) ) .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) ``` * examples: ``` // Rows clause table .window(Over partitionBy 'c orderBy 'rowTime preceding 2.rows as 'w1) .select( 'c, 'b.count over 'w1 as 'countB, 'e.sum over 'w1 as 'sumE) // Range clause table .window(Over partitionBy 'c orderBy 'rowTime preceding 2.milli as 'w1) .select( 'c, 'b.count over 'w1 as 'countB, 'e.sum over 'w1 as 'sumE) ``` * More detail Info : https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit# NOTE: The documentation of the OVER tableAPI not included in this PR. - [x] General - The pull request references the related JIRA issue ("[FLINK-6228][table] Integrating the OVER windows in the Table API") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6228-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3743.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3743 commit 03d4153be93d505cdb47d174e5aafe10eb93a45f Author: sunjincheng121Date: 2017-04-13T09:36:18Z [FLINK-6228][table] Integrating the OVER windows in the Table API (stream) > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3743 [FLINK-6228][table] Integrating the OVER windows in the Table API (st⦠In this PR I had integrating the OVER windows in the Table API, Implementation of the syntax and use examples are as followsï¼ * Syntax: ``` table .overWindows( (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy order_by_expression] (preceding UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) [following UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] as alias,...[n]) ) .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, ⦠[n]) ``` * examples: ``` // Rows clause table .window(Over partitionBy 'c orderBy 'rowTime preceding 2.rows as 'w1) .select( 'c, 'b.count over 'w1 as 'countB, 'e.sum over 'w1 as 'sumE) // Range clause table .window(Over partitionBy 'c orderBy 'rowTime preceding 2.milli as 'w1) .select( 'c, 'b.count over 'w1 as 'countB, 'e.sum over 'w1 as 'sumE) ``` * More detail Info : https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit# NOTE: The documentation of the OVER tableAPI not included in this PR. - [x] General - The pull request references the related JIRA issue ("[FLINK-6228][table] Integrating the OVER windows in the Table API") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6228-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3743.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3743 commit 03d4153be93d505cdb47d174e5aafe10eb93a45f Author: sunjincheng121Date: 2017-04-13T09:36:18Z [FLINK-6228][table] Integrating the OVER windows in the Table API (stream) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6120) Implement heartbeat logic between JobManager and ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-6120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975934#comment-15975934 ] ASF GitHub Bot commented on FLINK-6120: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3645 @tillrohrmann , just a kind reminder for this last heartbeat PR. > Implement heartbeat logic between JobManager and ResourceManager > > > Key: FLINK-6120 > URL: https://issues.apache.org/jira/browse/FLINK-6120 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: zhijiang >Assignee: zhijiang > > It is part of work for Flip-6. > The HeartbeatManager is mainly used for monitoring heartbeat target and > reporting payloads. > For {{ResourceManager}} side, it would trigger monitoring the > {{HeartbeatTarget}} when receive registration from {{JobManager}}, and > schedule a task to {{requestHeartbeat}} at interval time. If not receive > heartbeat response within duration time, the {{HeartbeatListener}} will > notify heartbeat timeout, then the {{ResourceManager}} should remove the > internal registered {{JobManager}}. > For {{JobManager}} side, it would trigger monitoring the {{HeartbeatTarget}} > when receive registration acknowledgement from {{ResourceManager}}. An it > will also be notified heartbeat timeout if not receive heartbeat request from > {{ResourceManager}} within duration time. > The current implementation will not interact payloads via heartbeat, and it > can be added if needed future. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3645: [FLINK-6120][Distributed Coordinator]Implement heartbeat ...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3645 @tillrohrmann , just a kind reminder for this last heartbeat PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang updated FLINK-6334: - Description: The current UDTF leverages the table.join(expression) interface, which is not a proper interface in terms of semantics. We would like to refactor this to let UDTF use table.join(table) interface. Very briefly, UDTF's apply method will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as join(Table) (was: UDTF's apply method returns a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as join(Table)) > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6334) Refactoring UDTF interface
Ruidong Li created FLINK-6334: - Summary: Refactoring UDTF interface Key: FLINK-6334 URL: https://issues.apache.org/jira/browse/FLINK-6334 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Ruidong Li UDTF's apply method returns a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6333) Utilize Bloomfilters in RocksDb
Ted Yu created FLINK-6333: - Summary: Utilize Bloomfilters in RocksDb Key: FLINK-6333 URL: https://issues.apache.org/jira/browse/FLINK-6333 Project: Flink Issue Type: Improvement Reporter: Ted Yu Bloom Filters would speed up RocksDb lookups. When we upgrade to RocksDb 5.2.1+, we would be able to do: {code} new BlockBasedTableConfig() .setBlockCacheSize(blockCacheSize) .setBlockSize(blockSize) .setFilter(new BloomFilter()) {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6295) use LoadingCache instead of WeakHashMap to lower latency
[ https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975860#comment-15975860 ] ASF GitHub Bot commented on FLINK-6295: --- Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 @zentol How do we know if a job requested is supended or not, as the status of jobs in backend is alway changing? > use LoadingCache instead of WeakHashMap to lower latency > > > Key: FLINK-6295 > URL: https://issues.apache.org/jira/browse/FLINK-6295 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Tao Wang >Assignee: Tao Wang > > Now in ExecutionGraphHolder, which is used in many handlers, we use a > WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage > collection. > The latency is too high when JVM do GC rarely, which will make status of jobs > or its tasks unmatched with the real ones. > LoadingCache is a common used cache implementation from guava lib, we can use > its time based eviction to lower latency of status update. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...
Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 @zentol How do we know if a job requested is supended or not, as the status of jobs in backend is alway changing? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-6327) Bug in CommonCalc's estimateRowCount() method
[ https://issues.apache.org/jira/browse/FLINK-6327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-6327. - Resolution: Fixed Fix Version/s: 1.3.0 > Bug in CommonCalc's estimateRowCount() method > - > > Key: FLINK-6327 > URL: https://issues.apache.org/jira/browse/FLINK-6327 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > Fix For: 1.3.0 > > > {{(rowCnt * 0.75).min(1.0)}} should be changed to {{(rowCnt * 0.75).max(1.0)}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6327) Bug in CommonCalc's estimateRowCount() method
[ https://issues.apache.org/jira/browse/FLINK-6327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975845#comment-15975845 ] ASF GitHub Bot commented on FLINK-6327: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3740 > Bug in CommonCalc's estimateRowCount() method > - > > Key: FLINK-6327 > URL: https://issues.apache.org/jira/browse/FLINK-6327 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > Fix For: 1.3.0 > > > {{(rowCnt * 0.75).min(1.0)}} should be changed to {{(rowCnt * 0.75).max(1.0)}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3740: [FLINK-6327] [table] Bug in CommonCalc's estimateR...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3740 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6327) Bug in CommonCalc's estimateRowCount() method
[ https://issues.apache.org/jira/browse/FLINK-6327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-6327: -- Component/s: Table API & SQL > Bug in CommonCalc's estimateRowCount() method > - > > Key: FLINK-6327 > URL: https://issues.apache.org/jira/browse/FLINK-6327 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > {{(rowCnt * 0.75).min(1.0)}} should be changed to {{(rowCnt * 0.75).max(1.0)}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112344921 --- Diff: flink-metrics/flink-metrics-datadog/pom.xml --- @@ -0,0 +1,79 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + org.apache.flink + flink-metrics + 1.3-SNAPSHOT + .. + + +org.apache.flink +flink-metrics-datadog +1.3-SNAPSHOT + --- End diff -- okhttp and okio are self contained, and they don't have any other dependencies. can you please elaborate more "you should create a fat-jar using the maven-shade-plugin"? I have never done mvn jar shading before... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112344622 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.datadog.utils.TimestampUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * Abstract metric of Datadog for serialization + * */ +public abstract class DMetric { + private final String metric; // Metric name --- End diff -- yes, exactly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112344522 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.datadog.utils.SerializationUtils; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, CharacterFilter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName, this); --- End diff -- make sense. Refactored as you suggested. btw, what's CharacterFilter#filterCharacters() for? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112344359 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.datadog.utils.SerializationUtils; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, CharacterFilter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName, this); + + List tags = new ArrayList<>(configTags); + tags.addAll(getTagsFromMetricGroup(group)); + + synchronized (this) { + if (metric instanceof Counter) { + Counter c = (Counter) metric; + counters.put(c, new DCounter(c, name, tags)); + } else if (metric instanceof Gauge) { + Gauge g = (Gauge) metric; + gauges.put(g, new DGauge(g, name, tags)); + } else if(metric instanceof Meter) { + Meter m = (Meter) metric; + // Only consider rate + meters.put(m, new DMeter(m, name, tags)); + } else if (metric instanceof Histogram) { + LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName); + } else { + LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + synchronized (this) { + if (metric instanceof Counter) { + counters.remove(metric); + } else if (metric instanceof Gauge) { + gauges.remove(metric); + } else if (metric instanceof Meter) { + meters.remove(metric); + } else if (metric
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112344296 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/utils/SerializationUtils.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class SerializationUtils { --- End diff -- moved to DatadogHttpClient --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112344276 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/utils/TimestampUtils.java --- @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog.utils; + +public class TimestampUtils { --- End diff -- moved to DMetric --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112342781 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.datadog.utils.SerializationUtils; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, CharacterFilter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName, this); + + List tags = new ArrayList<>(configTags); + tags.addAll(getTagsFromMetricGroup(group)); + + synchronized (this) { + if (metric instanceof Counter) { + Counter c = (Counter) metric; + counters.put(c, new DCounter(c, name, tags)); + } else if (metric instanceof Gauge) { + Gauge g = (Gauge) metric; + gauges.put(g, new DGauge(g, name, tags)); + } else if(metric instanceof Meter) { + Meter m = (Meter) metric; + // Only consider rate + meters.put(m, new DMeter(m, name, tags)); + } else if (metric instanceof Histogram) { + LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName); + } else { + LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + synchronized (this) { + if (metric instanceof Counter) { + counters.remove(metric); + } else if (metric instanceof Gauge) { + gauges.remove(metric); + } else if (metric instanceof Meter) { + meters.remove(metric); + } else if (metric
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112341874 --- Diff: flink-metrics/flink-metrics-datadog/pom.xml --- @@ -0,0 +1,79 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + org.apache.flink + flink-metrics + 1.3-SNAPSHOT + .. + + +org.apache.flink +flink-metrics-datadog +1.3-SNAPSHOT + + + + org.apache.flink + flink-metrics-core + ${project.version} + provided + + + + com.squareup.okhttp3 + okhttp + 3.6.0 + + + + com.squareup.okio + okio + 1.11.0 + + + + com.fasterxml.jackson.core + jackson-databind --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112341697 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.RequestBody; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Http client talking to Datadog + * */ +public class DatadogHttpClient{ + private static final String SERIES_URL_FORMAT = "https://app.datadoghq.com/api/v1/series?api_key=%s;; + private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.com/api/v1/validate?api_key=%s;; + private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8"); + private static final int TIMEOUT = 5; + + private final String seriesUrl; + private final String validateUrl; + private final OkHttpClient client; + private final String apiKey; + + public DatadogHttpClient(String dgApiKey) { + if(dgApiKey == null || dgApiKey.isEmpty()) { + throw new IllegalArgumentException( + "Invalid API key:" + dgApiKey); + } + apiKey = dgApiKey; + client = new OkHttpClient.Builder() + .connectTimeout(TIMEOUT, TimeUnit.SECONDS) + .writeTimeout(TIMEOUT, TimeUnit.SECONDS) + .readTimeout(TIMEOUT, TimeUnit.SECONDS) + .build(); + + seriesUrl = String.format(SERIES_URL_FORMAT, apiKey); + validateUrl = String.format(VALIDATE_URL_FORMAT, apiKey); + validateApiKey(); + } + + private void validateApiKey() { + Request r = new Request.Builder().url(validateUrl).get().build(); + + try { + Response response = client.newCall(r).execute(); + if(!response.isSuccessful()) { --- End diff -- I can't think of any case it fails and not because invalid key. If network or Datadog endpoint is down, the response will just timeout, and the code path goes to catch clause. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112341399 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.RequestBody; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Http client talking to Datadog + * */ +public class DatadogHttpClient{ + private static final String SERIES_URL_FORMAT = "https://app.datadoghq.com/api/v1/series?api_key=%s;; + private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.com/api/v1/validate?api_key=%s;; + private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8"); + private static final int TIMEOUT = 5; + + private final String seriesUrl; + private final String validateUrl; + private final OkHttpClient client; + private final String apiKey; + + public DatadogHttpClient(String dgApiKey) { + if(dgApiKey == null || dgApiKey.isEmpty()) { + throw new IllegalArgumentException( + "Invalid API key:" + dgApiKey); + } + apiKey = dgApiKey; + client = new OkHttpClient.Builder() + .connectTimeout(TIMEOUT, TimeUnit.SECONDS) + .writeTimeout(TIMEOUT, TimeUnit.SECONDS) + .readTimeout(TIMEOUT, TimeUnit.SECONDS) + .build(); + + seriesUrl = String.format(SERIES_URL_FORMAT, apiKey); + validateUrl = String.format(VALIDATE_URL_FORMAT, apiKey); + validateApiKey(); + } + + private void validateApiKey() { + Request r = new Request.Builder().url(validateUrl).get().build(); + + try { + Response response = client.newCall(r).execute(); + if(!response.isSuccessful()) { --- End diff -- Make sense. To broaden failure scenarios a bit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112341238 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.datadog.utils.TimestampUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * Abstract metric of Datadog for serialization + * */ +public abstract class DMetric { + private final String metric; // Metric name + private final MetricType type; + private final List tags; + + public DMetric(MetricType metricType, String metric, List tags) { + this.type = metricType; + this.metric = metric; + this.tags = tags; + } + + public MetricType getType() { + return type; --- End diff -- That's the python api, which already encapsulate metric types. I'm using the HTTP API, and it's here http://docs.datadoghq.com/api/?lang=console#metrics-post --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112340825 --- Diff: docs/monitoring/metrics.md --- @@ -436,6 +436,35 @@ metrics.reporter.stsd.port: 8125 {% endhighlight %} +### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter) + +In order to use this reporter you must copy `/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder +of your Flink distribution. + +Parameters: + +- `apikey` - the Datadog API key +- `tags` - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only + +Example configuration: + +{% highlight yaml %} + +metrics.reporters: dghttp +metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter +metrics.reporter.dghttp.apikey: xxx +metrics.reporter.dghttp.tags: myflinkapp,prod + +// , , , , , will be sent to Datadog as tags +metrics.scope.jm: .jobmanager --- End diff -- simplified it and moved out of code highlight. I believe it's necessary to explain to users how to find and filter tags in Datadog, in order to ramp them up faster and remove any potential friction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112340582 --- Diff: docs/monitoring/metrics.md --- @@ -436,6 +436,35 @@ metrics.reporter.stsd.port: 8125 {% endhighlight %} +### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter) + +In order to use this reporter you must copy `/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder +of your Flink distribution. + +Parameters: + +- `apikey` - the Datadog API key +- `tags` - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only + +Example configuration: + +{% highlight yaml %} + +metrics.reporters: dghttp +metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter +metrics.reporter.dghttp.apikey: xxx +metrics.reporter.dghttp.tags: myflinkapp,prod + +// , , , , , will be sent to Datadog as tags +metrics.scope.jm: .jobmanager --- End diff -- removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112340440 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.datadog.utils.SerializationUtils; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, CharacterFilter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName, this); + + List tags = new ArrayList<>(configTags); + tags.addAll(getTagsFromMetricGroup(group)); + + synchronized (this) { + if (metric instanceof Counter) { + Counter c = (Counter) metric; + counters.put(c, new DCounter(c, name, tags)); + } else if (metric instanceof Gauge) { + Gauge g = (Gauge) metric; + gauges.put(g, new DGauge(g, name, tags)); + } else if(metric instanceof Meter) { + Meter m = (Meter) metric; + // Only consider rate + meters.put(m, new DMeter(m, name, tags)); + } else if (metric instanceof Histogram) { + LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName); + } else { + LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + synchronized (this) { + if (metric instanceof Counter) { + counters.remove(metric); + } else if (metric instanceof Gauge) { + gauges.remove(metric); + } else if (metric instanceof Meter) { + meters.remove(metric); + } else if (metric
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112340350 --- Diff: flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpReporterTests.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class DatadogHttpReporterTests { + private static DatadogHttpReporter reporter; + + @BeforeClass --- End diff -- sure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112340221 --- Diff: flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpReporterTests.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class DatadogHttpReporterTests { + private static DatadogHttpReporter reporter; + + @BeforeClass + public static void init() { + reporter = new DatadogHttpReporter(); + } + + @Test + public void testFilterChars() { + assertEquals("", reporter.filterCharacters("")); +assertEquals("abc", reporter.filterCharacters("abc")); --- End diff -- hmm... it's showing fine on my machine. Reindent it any way... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112340046 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.RequestBody; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Http client talking to Datadog + * */ +public class DatadogHttpClient{ + private static final String SERIES_URL_FORMAT = "https://app.datadoghq.com/api/v1/series?api_key=%s;; + private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.com/api/v1/validate?api_key=%s;; + private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8"); + private static final int TIMEOUT = 5; + + private final String seriesUrl; + private final String validateUrl; + private final OkHttpClient client; --- End diff -- You might be looking at shutting down client cache. As the doc there mentioned, "Shutdown isn't necessary" :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112339524 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.Meter; + +import java.util.List; + +/** + * Mapping of meter between Flink and Datadog + * + * Only consider rate of the meter, due to Datadog HTTP API's limited support of meter + * */ +public class DMeter extends DMetric { + private final Meter meter; --- End diff -- I double checked. They are the same using tabs. Maybe a github UI issue if you saw something different --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112339291 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.datadog.utils.SerializationUtils; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, CharacterFilter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName, this); + + List tags = new ArrayList<>(configTags); + tags.addAll(getTagsFromMetricGroup(group)); + + synchronized (this) { + if (metric instanceof Counter) { + Counter c = (Counter) metric; + counters.put(c, new DCounter(c, name, tags)); + } else if (metric instanceof Gauge) { + Gauge g = (Gauge) metric; + gauges.put(g, new DGauge(g, name, tags)); --- End diff -- I made DGauge a generic one, rather than tied to any specific type. The reason being that I found sometimes the metric in this line doesn't have any value, and will throw NullPointException when calling Gauge#getValue(). The part that checks if a Gauge returns a number is in DatadogHttpReporter#report() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112338938 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.datadog.utils.SerializationUtils; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, CharacterFilter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName, this); + + List tags = new ArrayList<>(configTags); + tags.addAll(getTagsFromMetricGroup(group)); + + synchronized (this) { --- End diff -- sure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6332) Upgrade Scala version to 2.11.11
Ted Yu created FLINK-6332: - Summary: Upgrade Scala version to 2.11.11 Key: FLINK-6332 URL: https://issues.apache.org/jira/browse/FLINK-6332 Project: Flink Issue Type: Improvement Reporter: Ted Yu Priority: Minor Currently scala-2.11 profile uses Scala 2.11.7 2.11.11 is the most recent version. This issue is to upgrade to Scala 2.11.11 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975645#comment-15975645 ] ASF GitHub Bot commented on FLINK-6311: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/3738 @StephanEwen @tzulitai Thanks for telling me so useful information. I will fix it soon enough. Very appreciate it. > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKinesisConsu...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/3738 @StephanEwen @tzulitai Thanks for telling me so useful information. I will fix it soon enough. Very appreciate it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112305453 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/utils/TimestampUtils.java --- @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog.utils; + +public class TimestampUtils { --- End diff -- This class seems overkill; could also be a utility method in the DatadogHttpClient. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112305482 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/utils/SerializationUtils.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class SerializationUtils { --- End diff -- This class seems overkill; could also be a utility method in the DatadogHttpClient. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112305118 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.datadog.utils.SerializationUtils; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, CharacterFilter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName, this); + + List tags = new ArrayList<>(configTags); + tags.addAll(getTagsFromMetricGroup(group)); + + synchronized (this) { + if (metric instanceof Counter) { + Counter c = (Counter) metric; + counters.put(c, new DCounter(c, name, tags)); + } else if (metric instanceof Gauge) { + Gauge g = (Gauge) metric; + gauges.put(g, new DGauge(g, name, tags)); + } else if(metric instanceof Meter) { + Meter m = (Meter) metric; + // Only consider rate + meters.put(m, new DMeter(m, name, tags)); + } else if (metric instanceof Histogram) { + LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName); + } else { + LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + synchronized (this) { + if (metric instanceof Counter) { + counters.remove(metric); + } else if (metric instanceof Gauge) { + gauges.remove(metric); + } else if (metric instanceof Meter) { + meters.remove(metric); + } else if (metric
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112304098 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.RequestBody; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Http client talking to Datadog + * */ +public class DatadogHttpClient{ + private static final String SERIES_URL_FORMAT = "https://app.datadoghq.com/api/v1/series?api_key=%s;; + private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.com/api/v1/validate?api_key=%s;; + private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8"); + private static final int TIMEOUT = 5; + + private final String seriesUrl; + private final String validateUrl; + private final OkHttpClient client; + private final String apiKey; + + public DatadogHttpClient(String dgApiKey) { + if(dgApiKey == null || dgApiKey.isEmpty()) { + throw new IllegalArgumentException( + "Invalid API key:" + dgApiKey); + } + apiKey = dgApiKey; + client = new OkHttpClient.Builder() + .connectTimeout(TIMEOUT, TimeUnit.SECONDS) + .writeTimeout(TIMEOUT, TimeUnit.SECONDS) + .readTimeout(TIMEOUT, TimeUnit.SECONDS) + .build(); + + seriesUrl = String.format(SERIES_URL_FORMAT, apiKey); + validateUrl = String.format(VALIDATE_URL_FORMAT, apiKey); + validateApiKey(); + } + + private void validateApiKey() { + Request r = new Request.Builder().url(validateUrl).get().build(); + + try { + Response response = client.newCall(r).execute(); + if(!response.isSuccessful()) { --- End diff -- The response can fail for reasons other than the key being invalid, no? If so, the exception message should state "Failed to validate API key. ". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975434#comment-15975434 ] Seth Wiesman commented on FLINK-6315: - [~StephanEwen] So I guess this is the crux of the real issue, eventually consistent file systems cannot perform renames but only PUTS. If one checkpoint times out the next needs to re-PUT its files into the next valid bucket. I had considered something similar to what you are doing with incremental checkpointing, if checkpoint 3 arrives before 2 finishes than build on checkpoint 1. The reason I backed away from is that in the degenerate case where no checkpoint ever completes before the next begins no buckets would be committed and no data would ever be deleted from the buffer. Is this a case you deal with for incremental checkpointing or is checkpointing that frequently considered an anti-pattern and not dealt with? Of course the easy solution would be to say that the `EventualyConsistentSink` only provides exactly once when `maxConcurrentCheckpoints` is set to 1 but that doesn't feel like a satisfactory answer to me. > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112303033 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.datadog.utils.TimestampUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * Abstract metric of Datadog for serialization + * */ +public abstract class DMetric { + private final String metric; // Metric name + private final MetricType type; + private final List tags; + + public DMetric(MetricType metricType, String metric, List tags) { + this.type = metricType; + this.metric = metric; + this.tags = tags; + } + + public MetricType getType() { + return type; --- End diff -- what is the purpose of this field from DataDog's perspective? I don't see it being mentioned under http://docs.datadoghq.com/api/#metrics-post. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975433#comment-15975433 ] ASF GitHub Bot commented on FLINK-6311: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3738#discussion_r112302446 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -408,12 +408,15 @@ public void runFetcher() throws Exception { */ public void shutdownFetcher() { running = false; - mainThread.interrupt(); // the main thread may be sleeping for the discovery interval - + if (mainThread != null) { + mainThread.interrupt(); // the main thread may be sleeping for the discovery interval + } if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } - shardConsumersExecutor.shutdownNow(); + if (shardConsumersExecutor != null) { --- End diff -- Actually I think this is a redundant `null` check, because `shardConsumersExecutor` is final. It should never be null, so this null check might actually be confusing to other readers. > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKines...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3738#discussion_r112302446 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -408,12 +408,15 @@ public void runFetcher() throws Exception { */ public void shutdownFetcher() { running = false; - mainThread.interrupt(); // the main thread may be sleeping for the discovery interval - + if (mainThread != null) { + mainThread.interrupt(); // the main thread may be sleeping for the discovery interval + } if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } - shardConsumersExecutor.shutdownNow(); + if (shardConsumersExecutor != null) { --- End diff -- Actually I think this is a redundant `null` check, because `shardConsumersExecutor` is final. It should never be null, so this null check might actually be confusing to other readers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112301573 --- Diff: flink-metrics/flink-metrics-datadog/pom.xml --- @@ -0,0 +1,79 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + org.apache.flink + flink-metrics + 1.3-SNAPSHOT + .. + + +org.apache.flink +flink-metrics-datadog +1.3-SNAPSHOT + + + + org.apache.flink + flink-metrics-core + ${project.version} + provided + + + + com.squareup.okhttp3 + okhttp + 3.6.0 + + + + com.squareup.okio + okio + 1.11.0 + + + + com.fasterxml.jackson.core + jackson-databind --- End diff -- You should be able to set this to provided. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112301542 --- Diff: flink-metrics/flink-metrics-datadog/pom.xml --- @@ -0,0 +1,79 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + org.apache.flink + flink-metrics + 1.3-SNAPSHOT + .. + + +org.apache.flink +flink-metrics-datadog +1.3-SNAPSHOT + --- End diff -- you should create a fat-jar using the maven-shade-plugin. If the okhttp and okio dependencies have significant dependencies as well these dependencies should eb shaded as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6008) collection of BlobServer improvements
[ https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975416#comment-15975416 ] ASF GitHub Bot commented on FLINK-6008: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3512 Found a race between `BlobCache#deleteAll(JobID)` and `BlobCache#getURL(BlobKey)` now that the former is actually being used - this needs to be fixed first before merging: `BlobCache#deleteAll(JobID)` deletes the job directory which is only created at the start of `BlobCache#getURL(BlobKey)` which then relies on the directory being present. > collection of BlobServer improvements > - > > Key: FLINK-6008 > URL: https://issues.apache.org/jira/browse/FLINK-6008 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > The following things should be improved around the BlobServer/BlobCache: > * update config uptions with non-deprecated ones, e.g. > {{high-availability.cluster-id}} and {{high-availability.storageDir}} > * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}} > * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs > (prepares FLINK-4399] > * remove {{NAME_ADDRESSABLE}} blobs after job/task termination > * do not fail the {{BlobServer}} when a delete operation fails > * code style, like using {{Preconditions.checkArgument}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3512: [FLINK-6008] collection of BlobServer improvements
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3512 Found a race between `BlobCache#deleteAll(JobID)` and `BlobCache#getURL(BlobKey)` now that the former is actually being used - this needs to be fixed first before merging: `BlobCache#deleteAll(JobID)` deletes the job directory which is only created at the start of `BlobCache#getURL(BlobKey)` which then relies on the directory being present. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975415#comment-15975415 ] ASF GitHub Bot commented on FLINK-6311: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3738 LGTM after Stephan's comment on making `mainThread` variable `volatile` is addressed. Could you do that @zhangminglei? Once updated I'll proceed to merge this, thanks :) > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKinesisConsu...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3738 LGTM after Stephan's comment on making `mainThread` variable `volatile` is addressed. Could you do that @zhangminglei? Once updated I'll proceed to merge this, thanks :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112300620 --- Diff: docs/monitoring/metrics.md --- @@ -436,6 +436,35 @@ metrics.reporter.stsd.port: 8125 {% endhighlight %} +### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter) + +In order to use this reporter you must copy `/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder +of your Flink distribution. + +Parameters: + +- `apikey` - the Datadog API key +- `tags` - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only + +Example configuration: + +{% highlight yaml %} + +metrics.reporters: dghttp +metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter +metrics.reporter.dghttp.apikey: xxx +metrics.reporter.dghttp.tags: myflinkapp,prod + +// , , , , , will be sent to Datadog as tags +metrics.scope.jm: .jobmanager --- End diff -- this section seems unnecessary --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112300266 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.datadog.utils.SerializationUtils; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, CharacterFilter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName, this); + + List tags = new ArrayList<>(configTags); + tags.addAll(getTagsFromMetricGroup(group)); + + synchronized (this) { + if (metric instanceof Counter) { + Counter c = (Counter) metric; + counters.put(c, new DCounter(c, name, tags)); + } else if (metric instanceof Gauge) { + Gauge g = (Gauge) metric; + gauges.put(g, new DGauge(g, name, tags)); + } else if(metric instanceof Meter) { + Meter m = (Meter) metric; + // Only consider rate + meters.put(m, new DMeter(m, name, tags)); + } else if (metric instanceof Histogram) { + LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName); + } else { + LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + synchronized (this) { + if (metric instanceof Counter) { + counters.remove(metric); + } else if (metric instanceof Gauge) { + gauges.remove(metric); + } else if (metric instanceof Meter) { + meters.remove(metric); + } else if (metric
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112300022 --- Diff: flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpReporterTests.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class DatadogHttpReporterTests { + private static DatadogHttpReporter reporter; + + @BeforeClass --- End diff -- if there's only a single test we don't need a @BeforeClass method nor a static field; just introduce a local variable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112299684 --- Diff: flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpReporterTests.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class DatadogHttpReporterTests { + private static DatadogHttpReporter reporter; + + @BeforeClass + public static void init() { + reporter = new DatadogHttpReporter(); + } + + @Test + public void testFilterChars() { + assertEquals("", reporter.filterCharacters("")); +assertEquals("abc", reporter.filterCharacters("abc")); --- End diff -- missing indentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112299390 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.RequestBody; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Http client talking to Datadog + * */ +public class DatadogHttpClient{ + private static final String SERIES_URL_FORMAT = "https://app.datadoghq.com/api/v1/series?api_key=%s;; + private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.com/api/v1/validate?api_key=%s;; + private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8"); + private static final int TIMEOUT = 5; + + private final String seriesUrl; + private final String validateUrl; + private final OkHttpClient client; --- End diff -- you may want to add a close() method that shuts down the OkHttpClient as described in https://github.com/square/okhttp/blob/master/okhttp/src/main/java/okhttp3/OkHttpClient.java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112298655 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.Meter; + +import java.util.List; + +/** + * Mapping of meter between Flink and Datadog + * + * Only consider rate of the meter, due to Datadog HTTP API's limited support of meter + * */ +public class DMeter extends DMetric { + private final Meter meter; --- End diff -- The indendation in this class appears to be different than in others. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112298482 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.datadog.utils.SerializationUtils; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, CharacterFilter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName, this); + + List tags = new ArrayList<>(configTags); + tags.addAll(getTagsFromMetricGroup(group)); + + synchronized (this) { + if (metric instanceof Counter) { + Counter c = (Counter) metric; + counters.put(c, new DCounter(c, name, tags)); + } else if (metric instanceof Gauge) { + Gauge g = (Gauge) metric; + gauges.put(g, new DGauge(g, name, tags)); + } else if(metric instanceof Meter) { + Meter m = (Meter) metric; + // Only consider rate + meters.put(m, new DMeter(m, name, tags)); + } else if (metric instanceof Histogram) { + LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName); + } else { + LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " + + "does not support this metric type.", metric.getClass().getName()); + } + } + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + synchronized (this) { + if (metric instanceof Counter) { + counters.remove(metric); + } else if (metric instanceof Gauge) { + gauges.remove(metric); + } else if (metric instanceof Meter) { + meters.remove(metric); + } else if (metric
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112298348 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.datadog.utils.SerializationUtils; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, CharacterFilter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName, this); + + List tags = new ArrayList<>(configTags); + tags.addAll(getTagsFromMetricGroup(group)); + + synchronized (this) { + if (metric instanceof Counter) { + Counter c = (Counter) metric; + counters.put(c, new DCounter(c, name, tags)); + } else if (metric instanceof Gauge) { + Gauge g = (Gauge) metric; + gauges.put(g, new DGauge(g, name, tags)); --- End diff -- You will run into `ClassCastExceptions` here. You have to check whether the Gauge returns a number before creating the `DGauge`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112298056 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.datadog.utils.SerializationUtils; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, CharacterFilter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName, this); + + List tags = new ArrayList<>(configTags); + tags.addAll(getTagsFromMetricGroup(group)); + + synchronized (this) { --- End diff -- No need to synchronize since you use a `ConcurrentHashMap`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112297785 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.datadog.utils.SerializationUtils; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Metric Reporter for Datadog + * + * Variables in metrics scope will be sent to Datadog as tags + * */ +public class DatadogHttpReporter implements MetricReporter, CharacterFilter, Scheduled { + private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class); + + // Both Flink's Gauge and Meter values are taken as gauge in Datadog + private final Mapgauges = new ConcurrentHashMap<>(); + private final Map counters = new ConcurrentHashMap<>(); + private final Map meters = new ConcurrentHashMap<>(); + + private DatadogHttpClient client; + private List configTags; + + public static final String API_KEY = "apikey"; + public static final String TAGS = "tags"; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + final String name = group.getMetricIdentifier(metricName, this); --- End diff -- Since you don't filter characters anyway you can use `MetricGroup#getMetricIdentifier(String name)`. Then you also don't need to imp0lement `CharacterFilter`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112296701 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.datadog.utils.TimestampUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * Abstract metric of Datadog for serialization + * */ +public abstract class DMetric { + private final String metric; // Metric name --- End diff -- So this is a requirement by datadog then? Which expects the name field to be called "metric"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975318#comment-15975318 ] Aljoscha Krettek commented on FLINK-6315: - In the {{BucketingSink}}, we move "pending" files to "final" when we get a checkpoint complete notification. When restoring (from a failure or from a savepoint) we also move pending files that were confirmed by a checkpoint but have not been moved because we had a failure before receiving the completion notification to "final". TL:DR Either the notify or a restore does the move from "pending" to "final". > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3736: [Flink-6013][metrics] Add Datadog HTTP metrics reporter
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/3736 Addressed @zentol and @StephanEwen 's comments. Ready for another round! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-3328) Incorrectly shaded dependencies in flink-runtime
[ https://issues.apache.org/jira/browse/FLINK-3328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975268#comment-15975268 ] Luke Hutchison edited comment on FLINK-3328 at 4/19/17 6:43 PM: [~StephanEwen] It's only not a problem if you know that in every case, the version of every single shared library will be the same between all Flink jars. And even if you know that will be the case, it's hardly ideal space-wise, because you end up pulling in the same dependencies multiple times for every Flink project, until a fat jar is built. In general, there is a good reason for that Maven warning, because if there's any way that multiple different versions of a dep can occur, you can trigger bugs that are maddeningly hard to trace. If every Flink jar (like {{flink-runtime}}) depends on one of the core Flink jars (e.g. {{flink-core}}), couldn't all the shared dependencies just be put in that one jar? was (Author: lukehutch): [~StephanEwen] It's only not a problem if you know that in every case, the version of every single shared library will be the same between all Flink jars. And even if you know that will be the case, it's hardly ideal space-wise, because you end up pulling in the same dependencies multiple times for every Flink project, until a fat jar is built. In general, there is a good reason for that Maven warning, because if there's any way that multiple different versions of a dep can occur, you can trigger bugs that are maddeningly hard to trace. If every Flink jar (like `flink-runtime`) depends on one of the core Flink jars (e.g. `flink-core`), couldn't all the shared dependencies just be put in that one jar? > Incorrectly shaded dependencies in flink-runtime > > > Key: FLINK-3328 > URL: https://issues.apache.org/jira/browse/FLINK-3328 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.0.0 > > > There are apparently some dependencies shaded into {{flink-runtime}} fat jar > that are not relocated. (the flink-runtime jar is now 70 MB) > From the output of the shading in flink-dist, it looks as if this concerns at > least > - Zookeeper > - slf4j > - jline > - netty (3.x) > Possible more. > {code} > [WARNING] zookeeper-3.4.6.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 440 > overlapping classes: > [WARNING] - org.apache.zookeeper.server.NettyServerCnxnFactory > [WARNING] - org.apache.jute.compiler.JFile > [WARNING] - org.apache.zookeeper.server.SessionTracker$Session > [WARNING] - org.apache.zookeeper.server.quorum.AuthFastLeaderElection$1 > [WARNING] - org.apache.jute.compiler.JLong > [WARNING] - org.apache.zookeeper.client.ZooKeeperSaslClient$SaslState > [WARNING] - org.apache.zookeeper.server.auth.KerberosName$Rule > [WARNING] - org.apache.jute.CsvOutputArchive > [WARNING] - org.apache.zookeeper.server.quorum.QuorumPeer > [WARNING] - org.apache.zookeeper.ZooKeeper$DataWatchRegistration > [WARNING] - 430 more... > [WARNING] slf4j-api-1.7.7.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 24 > overlapping classes: > [WARNING] - org.slf4j.spi.MarkerFactoryBinder > [WARNING] - org.slf4j.helpers.SubstituteLogger > [WARNING] - org.slf4j.helpers.BasicMarker > [WARNING] - org.slf4j.helpers.Util > [WARNING] - org.slf4j.LoggerFactory > [WARNING] - org.slf4j.Marker > [WARNING] - org.slf4j.helpers.NamedLoggerBase > [WARNING] - org.slf4j.Logger > [WARNING] - org.slf4j.spi.LocationAwareLogger > [WARNING] - org.slf4j.ILoggerFactory > [WARNING] - 14 more... > [WARNING] jansi-1.4.jar, jline-2.10.4.jar define 23 overlapping classes: > [WARNING] - org.fusesource.jansi.Ansi$Erase > [WARNING] - org.fusesource.jansi.Ansi > [WARNING] - org.fusesource.jansi.AnsiOutputStream > [WARNING] - org.fusesource.jansi.internal.CLibrary > [WARNING] - org.fusesource.jansi.Ansi$2 > [WARNING] - org.fusesource.jansi.WindowsAnsiOutputStream > [WARNING] - org.fusesource.jansi.AnsiRenderer$Code > [WARNING] - org.fusesource.jansi.AnsiConsole > [WARNING] - org.fusesource.jansi.Ansi$Attribute > [WARNING] - org.fusesource.jansi.internal.Kernel32 > [WARNING] - 13 more... > [WARNING] commons-beanutils-core-1.8.0.jar, commons-collections-3.2.2.jar, > commons-beanutils-1.7.0.jar define 10 overlapping classes: > [WARNING] - org.apache.commons.collections.FastHashMap$EntrySet > [WARNING] - org.apache.commons.collections.ArrayStack > [WARNING] - org.apache.commons.collections.FastHashMap$1 > [WARNING] - org.apache.commons.collections.FastHashMap$KeySet > [WARNING] - org.apache.commons.collections.FastHashMap$CollectionView > [WARNING] -
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112283099 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/metric/DGauge.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog.metric; + +import java.util.List; + +/** + * Mapping of gauge between Flink and Datadog + * */ +public class DGauge extends DMetric { --- End diff -- yes, see my explanation below --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3328) Incorrectly shaded dependencies in flink-runtime
[ https://issues.apache.org/jira/browse/FLINK-3328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975268#comment-15975268 ] Luke Hutchison commented on FLINK-3328: --- [~StephanEwen] It's only not a problem if you know that in every case, the version of every single shared library will be the same between all Flink jars. And even if you know that will be the case, it's hardly ideal space-wise, because you end up pulling in the same dependencies multiple times for every Flink project, until a fat jar is built. In general, there is a good reason for that Maven warning, because if there's any way that multiple different versions of a dep can occur, you can trigger bugs that are maddeningly hard to trace. If every Flink jar (like `flink-runtime`) depends on one of the core Flink jars (e.g. `flink-core`), couldn't all the shared dependencies just be put in that one jar? > Incorrectly shaded dependencies in flink-runtime > > > Key: FLINK-3328 > URL: https://issues.apache.org/jira/browse/FLINK-3328 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.0.0 > > > There are apparently some dependencies shaded into {{flink-runtime}} fat jar > that are not relocated. (the flink-runtime jar is now 70 MB) > From the output of the shading in flink-dist, it looks as if this concerns at > least > - Zookeeper > - slf4j > - jline > - netty (3.x) > Possible more. > {code} > [WARNING] zookeeper-3.4.6.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 440 > overlapping classes: > [WARNING] - org.apache.zookeeper.server.NettyServerCnxnFactory > [WARNING] - org.apache.jute.compiler.JFile > [WARNING] - org.apache.zookeeper.server.SessionTracker$Session > [WARNING] - org.apache.zookeeper.server.quorum.AuthFastLeaderElection$1 > [WARNING] - org.apache.jute.compiler.JLong > [WARNING] - org.apache.zookeeper.client.ZooKeeperSaslClient$SaslState > [WARNING] - org.apache.zookeeper.server.auth.KerberosName$Rule > [WARNING] - org.apache.jute.CsvOutputArchive > [WARNING] - org.apache.zookeeper.server.quorum.QuorumPeer > [WARNING] - org.apache.zookeeper.ZooKeeper$DataWatchRegistration > [WARNING] - 430 more... > [WARNING] slf4j-api-1.7.7.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 24 > overlapping classes: > [WARNING] - org.slf4j.spi.MarkerFactoryBinder > [WARNING] - org.slf4j.helpers.SubstituteLogger > [WARNING] - org.slf4j.helpers.BasicMarker > [WARNING] - org.slf4j.helpers.Util > [WARNING] - org.slf4j.LoggerFactory > [WARNING] - org.slf4j.Marker > [WARNING] - org.slf4j.helpers.NamedLoggerBase > [WARNING] - org.slf4j.Logger > [WARNING] - org.slf4j.spi.LocationAwareLogger > [WARNING] - org.slf4j.ILoggerFactory > [WARNING] - 14 more... > [WARNING] jansi-1.4.jar, jline-2.10.4.jar define 23 overlapping classes: > [WARNING] - org.fusesource.jansi.Ansi$Erase > [WARNING] - org.fusesource.jansi.Ansi > [WARNING] - org.fusesource.jansi.AnsiOutputStream > [WARNING] - org.fusesource.jansi.internal.CLibrary > [WARNING] - org.fusesource.jansi.Ansi$2 > [WARNING] - org.fusesource.jansi.WindowsAnsiOutputStream > [WARNING] - org.fusesource.jansi.AnsiRenderer$Code > [WARNING] - org.fusesource.jansi.AnsiConsole > [WARNING] - org.fusesource.jansi.Ansi$Attribute > [WARNING] - org.fusesource.jansi.internal.Kernel32 > [WARNING] - 13 more... > [WARNING] commons-beanutils-core-1.8.0.jar, commons-collections-3.2.2.jar, > commons-beanutils-1.7.0.jar define 10 overlapping classes: > [WARNING] - org.apache.commons.collections.FastHashMap$EntrySet > [WARNING] - org.apache.commons.collections.ArrayStack > [WARNING] - org.apache.commons.collections.FastHashMap$1 > [WARNING] - org.apache.commons.collections.FastHashMap$KeySet > [WARNING] - org.apache.commons.collections.FastHashMap$CollectionView > [WARNING] - org.apache.commons.collections.BufferUnderflowException > [WARNING] - org.apache.commons.collections.Buffer > [WARNING] - > org.apache.commons.collections.FastHashMap$CollectionView$CollectionViewIterator > [WARNING] - org.apache.commons.collections.FastHashMap$Values > [WARNING] - org.apache.commons.collections.FastHashMap > [WARNING] flink-streaming-scala_2.10-1.0-SNAPSHOT.jar, > flink-core-1.0-SNAPSHOT.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar, > flink-java-1.0-SNAPSHOT.jar, flink-streaming-java_2.10-1.0-SNAPSHOT.jar, > flink-scala_2.10-1.0-SNAPSHOT.jar, flink-clients_2.10-1.0-SNAPSHOT.jar, > flink-optimizer_2.10-1.0-SNAPSHOT.jar, > flink-runtime-web_2.10-1.0-SNAPSHOT.jar define 1690 overlapping classes: > [WARNING] - > org.apache.flink.shaded.com.google.common.collect.LinkedListMultimap > [WARNING]
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112282965 --- Diff: flink-metrics/flink-metrics-datadog/pom.xml --- @@ -0,0 +1,85 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + + org.apache.flink + flink-metrics + 1.3-SNAPSHOT + .. + + +org.apache.flink +flink-metrics-datadog +1.3-SNAPSHOT + + + + org.apache.flink + flink-metrics-core + ${project.version} + provided + + + + com.google.guava --- End diff -- I removed guava dependency as Stephan proposed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6103) LocalFileSystem rename() uses File.renameTo()
[ https://issues.apache.org/jira/browse/FLINK-6103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975260#comment-15975260 ] ASF GitHub Bot commented on FLINK-6103: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3598 I would do the following: - log nothing - Catch the errors that are regular "move failed" exceptions and return false. - `FileNotFoundException` - `DirectoryNotEmptyException` - `SecurityException` - Let all other `IOExceptions` bubble out > LocalFileSystem rename() uses File.renameTo() > - > > Key: FLINK-6103 > URL: https://issues.apache.org/jira/browse/FLINK-6103 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier > Labels: filesystem > > I've tried to move a directory to another on the LocalFilesystem and it > doesn't work (in my case fs is an instance of java.io.UnixFileSystem). > As for Flink-1840 (there was a PR to fix the issue - > https://github.com/apache/flink/pull/578) the problem is that > {{File.renameTo()}} is not reliable. > Indeed, the Javadoc says: > bq. Renames the file denoted by this abstract pathname. Many aspects of the > behavior of this method are inherently platform-dependent: The rename > operation might not be able to move a file from one filesystem to another, it > might not be atomic, and it might not succeed if a file with the destination > abstract pathname already exists. The return value should always be checked > to make sure that the rename operation was successful. Note that the > java.nio.file.Files class defines the move method to move or rename a file in > a platform independent manner -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3598: [FLINK-6103] LocalFileSystem rename() uses File.renameTo(...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3598 I would do the following: - log nothing - Catch the errors that are regular "move failed" exceptions and return false. - `FileNotFoundException` - `DirectoryNotEmptyException` - `SecurityException` - Let all other `IOExceptions` bubble out --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 @WangTaoTheTonic The purpose of the cache is to reduce queries to the JobManager; and since the state of the job is available through the ExecutionGraph the cache still fulfills its purpose. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6295) use LoadingCache instead of WeakHashMap to lower latency
[ https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975259#comment-15975259 ] ASF GitHub Bot commented on FLINK-6295: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3709 @WangTaoTheTonic The purpose of the cache is to reduce queries to the JobManager; and since the state of the job is available through the ExecutionGraph the cache still fulfills its purpose. > use LoadingCache instead of WeakHashMap to lower latency > > > Key: FLINK-6295 > URL: https://issues.apache.org/jira/browse/FLINK-6295 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Tao Wang >Assignee: Tao Wang > > Now in ExecutionGraphHolder, which is used in many handlers, we use a > WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage > collection. > The latency is too high when JVM do GC rarely, which will make status of jobs > or its tasks unmatched with the real ones. > LoadingCache is a common used cache implementation from guava lib, we can use > its time based eviction to lower latency of status update. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112280185 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.datadog.utils.SerializationUtils; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.Scheduled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + + +/** + * Abbr: dghttp --- End diff -- This is reminder for flink-conf.yaml when users choose to use this reporter. ``` metrics.reporters: dghttp ``` I'll add more comment here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3736#discussion_r112279814 --- Diff: flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.datadog; + +import org.apache.flink.metrics.datadog.utils.TimestampUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * Abstract metric of Datadog for serialization + * */ +public abstract class DMetric { + private final String metric; // Metric name --- End diff -- 'metric' is a field name in serialized json, and Jackson takes the name here during serialization. Thus, I can't rename it to something else. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6103) LocalFileSystem rename() uses File.renameTo()
[ https://issues.apache.org/jira/browse/FLINK-6103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975242#comment-15975242 ] ASF GitHub Bot commented on FLINK-6103: --- Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/3598 Of course..but how should I handle them? should I catch just one exception? What should I log? > LocalFileSystem rename() uses File.renameTo() > - > > Key: FLINK-6103 > URL: https://issues.apache.org/jira/browse/FLINK-6103 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier > Labels: filesystem > > I've tried to move a directory to another on the LocalFilesystem and it > doesn't work (in my case fs is an instance of java.io.UnixFileSystem). > As for Flink-1840 (there was a PR to fix the issue - > https://github.com/apache/flink/pull/578) the problem is that > {{File.renameTo()}} is not reliable. > Indeed, the Javadoc says: > bq. Renames the file denoted by this abstract pathname. Many aspects of the > behavior of this method are inherently platform-dependent: The rename > operation might not be able to move a file from one filesystem to another, it > might not be atomic, and it might not succeed if a file with the destination > abstract pathname already exists. The return value should always be checked > to make sure that the rename operation was successful. Note that the > java.nio.file.Files class defines the move method to move or rename a file in > a platform independent manner -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3598: [FLINK-6103] LocalFileSystem rename() uses File.renameTo(...
Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/3598 Of course..but how should I handle them? should I catch just one exception? What should I log? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5338) Make printing sinks non-parallel
[ https://issues.apache.org/jira/browse/FLINK-5338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-5338. --- Resolution: Won't Fix Fix Version/s: (was: 2.0.0) > Make printing sinks non-parallel > > > Key: FLINK-5338 > URL: https://issues.apache.org/jira/browse/FLINK-5338 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Stephan Ewen >Priority: Minor > > This is a suggestion, up for discussion. > I suggest to make the {{DataStream::print()}} sink function inherently > non-parallel, i.e., always executed in a single parallel sink. > Since printing is only for toy setups anyways, and the performance is usually > bound by the stdout stream, non-parallel prints should be sufficient. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5338) Make printing sinks non-parallel
[ https://issues.apache.org/jira/browse/FLINK-5338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975235#comment-15975235 ] Stephan Ewen commented on FLINK-5338: - I am okay with closing it... > Make printing sinks non-parallel > > > Key: FLINK-5338 > URL: https://issues.apache.org/jira/browse/FLINK-5338 > Project: Flink > Issue Type: Sub-task > Components: DataStream API >Reporter: Stephan Ewen >Priority: Minor > > This is a suggestion, up for discussion. > I suggest to make the {{DataStream::print()}} sink function inherently > non-parallel, i.e., always executed in a single parallel sink. > Since printing is only for toy setups anyways, and the performance is usually > bound by the stdout stream, non-parallel prints should be sufficient. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3328) Incorrectly shaded dependencies in flink-runtime
[ https://issues.apache.org/jira/browse/FLINK-3328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975227#comment-15975227 ] Stephan Ewen commented on FLINK-3328: - That is not a problem. Guava and asm must be in each artifact ({{flink-core}}, {{flink-runtime}}, etc) to make them self contained. When assembling the final jar, they all have guava, so you have an overlap of the exact same classes. The alternative is to relocate it to a different namespace in each artifact, but then we get guava 10 times into the {{flink-dist}}, and that is not very desirable. > Incorrectly shaded dependencies in flink-runtime > > > Key: FLINK-3328 > URL: https://issues.apache.org/jira/browse/FLINK-3328 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.0.0 > > > There are apparently some dependencies shaded into {{flink-runtime}} fat jar > that are not relocated. (the flink-runtime jar is now 70 MB) > From the output of the shading in flink-dist, it looks as if this concerns at > least > - Zookeeper > - slf4j > - jline > - netty (3.x) > Possible more. > {code} > [WARNING] zookeeper-3.4.6.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 440 > overlapping classes: > [WARNING] - org.apache.zookeeper.server.NettyServerCnxnFactory > [WARNING] - org.apache.jute.compiler.JFile > [WARNING] - org.apache.zookeeper.server.SessionTracker$Session > [WARNING] - org.apache.zookeeper.server.quorum.AuthFastLeaderElection$1 > [WARNING] - org.apache.jute.compiler.JLong > [WARNING] - org.apache.zookeeper.client.ZooKeeperSaslClient$SaslState > [WARNING] - org.apache.zookeeper.server.auth.KerberosName$Rule > [WARNING] - org.apache.jute.CsvOutputArchive > [WARNING] - org.apache.zookeeper.server.quorum.QuorumPeer > [WARNING] - org.apache.zookeeper.ZooKeeper$DataWatchRegistration > [WARNING] - 430 more... > [WARNING] slf4j-api-1.7.7.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 24 > overlapping classes: > [WARNING] - org.slf4j.spi.MarkerFactoryBinder > [WARNING] - org.slf4j.helpers.SubstituteLogger > [WARNING] - org.slf4j.helpers.BasicMarker > [WARNING] - org.slf4j.helpers.Util > [WARNING] - org.slf4j.LoggerFactory > [WARNING] - org.slf4j.Marker > [WARNING] - org.slf4j.helpers.NamedLoggerBase > [WARNING] - org.slf4j.Logger > [WARNING] - org.slf4j.spi.LocationAwareLogger > [WARNING] - org.slf4j.ILoggerFactory > [WARNING] - 14 more... > [WARNING] jansi-1.4.jar, jline-2.10.4.jar define 23 overlapping classes: > [WARNING] - org.fusesource.jansi.Ansi$Erase > [WARNING] - org.fusesource.jansi.Ansi > [WARNING] - org.fusesource.jansi.AnsiOutputStream > [WARNING] - org.fusesource.jansi.internal.CLibrary > [WARNING] - org.fusesource.jansi.Ansi$2 > [WARNING] - org.fusesource.jansi.WindowsAnsiOutputStream > [WARNING] - org.fusesource.jansi.AnsiRenderer$Code > [WARNING] - org.fusesource.jansi.AnsiConsole > [WARNING] - org.fusesource.jansi.Ansi$Attribute > [WARNING] - org.fusesource.jansi.internal.Kernel32 > [WARNING] - 13 more... > [WARNING] commons-beanutils-core-1.8.0.jar, commons-collections-3.2.2.jar, > commons-beanutils-1.7.0.jar define 10 overlapping classes: > [WARNING] - org.apache.commons.collections.FastHashMap$EntrySet > [WARNING] - org.apache.commons.collections.ArrayStack > [WARNING] - org.apache.commons.collections.FastHashMap$1 > [WARNING] - org.apache.commons.collections.FastHashMap$KeySet > [WARNING] - org.apache.commons.collections.FastHashMap$CollectionView > [WARNING] - org.apache.commons.collections.BufferUnderflowException > [WARNING] - org.apache.commons.collections.Buffer > [WARNING] - > org.apache.commons.collections.FastHashMap$CollectionView$CollectionViewIterator > [WARNING] - org.apache.commons.collections.FastHashMap$Values > [WARNING] - org.apache.commons.collections.FastHashMap > [WARNING] flink-streaming-scala_2.10-1.0-SNAPSHOT.jar, > flink-core-1.0-SNAPSHOT.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar, > flink-java-1.0-SNAPSHOT.jar, flink-streaming-java_2.10-1.0-SNAPSHOT.jar, > flink-scala_2.10-1.0-SNAPSHOT.jar, flink-clients_2.10-1.0-SNAPSHOT.jar, > flink-optimizer_2.10-1.0-SNAPSHOT.jar, > flink-runtime-web_2.10-1.0-SNAPSHOT.jar define 1690 overlapping classes: > [WARNING] - > org.apache.flink.shaded.com.google.common.collect.LinkedListMultimap > [WARNING] - > org.apache.flink.shaded.com.google.common.io.ByteSource$AsCharSource > [WARNING] - org.apache.flink.shaded.com.google.common.escape.Platform > [WARNING] - > org.apache.flink.shaded.com.google.common.util.concurrent.Futures$ImmediateFailedCheckedFuture > [WARNING] - >
[jira] [Commented] (FLINK-5623) TempBarrier dam has been closed
[ https://issues.apache.org/jira/browse/FLINK-5623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975220#comment-15975220 ] Stephan Ewen commented on FLINK-5623: - I think the issue is that the "pipeline breaker" is not properly reset. The logic closes it, but then on re-creation re-references it. The fix could be to add to {{BatchTask}}, line 887 a {code} ... this.tempBarriers[i].close(); this.tempBarriers[i] = null; // <<= this is the new line {code} > TempBarrier dam has been closed > --- > > Key: FLINK-5623 > URL: https://issues.apache.org/jira/browse/FLINK-5623 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.3.0 >Reporter: Greg Hogan > > PageRank (see PR from FLINK-4896) results in the following error. Can be > reproduced by changing {{AsmTestBase:63}} to {{env = > executionEnvironment.createLocalEnvironment();}} then running > {{PageRankTest}} (fails for Simple and RMat graph tests, succeeds for > Complete graph test). > {noformat} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.flink.graph.asm.dataset.AbstractDataSetAnalytic.execute(AbstractDataSetAnalytic.java:55) > at org.apache.flink.graph.drivers.PageRank.print(PageRank.java:113) > at org.apache.flink.graph.Runner.main(Runner.java:257) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:834) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1076) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1123) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1120) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:905) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:848) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:848) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: An error occurred creating the temp > table. > at >
[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout
[ https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975203#comment-15975203 ] Stephan Ewen commented on FLINK-6315: - I think you are thinking about it the right way. When checkpoint 2 does not happen for whatever reason then checkpoint 3 should be in charge of everything since the last successful checkpoint. I see the problem now: When checkpoint 3 starts, you may not yet know whether checkpoint 2 is actually going to complete. To make it more tricky, it may actually be that checkpoint 2 fails (due to a timeout) after checkpoint 3 completes. In the incremental checkpointing code, we have a similar problem. In that case, we can only re-reference a diff if it is part of a completed checkpoint. If for example checkpoint 2 is not complete when checkpoint 3 is started, then checkpoint 3 builds on checkpoint 1, not on checkpoint 2. [~aljoscha] How is that handled in the regular bucketing sink? > Notify on checkpoint timeout > - > > Key: FLINK-6315 > URL: https://issues.apache.org/jira/browse/FLINK-6315 > Project: Flink > Issue Type: New Feature > Components: Core >Reporter: Seth Wiesman >Assignee: Seth Wiesman > > A common use case when writing a custom operator that outputs data to some > third party location to partially output on checkpoint and then commit on > notifyCheckpointComplete. If that external system does not gracefully handle > rollbacks (such as Amazon S3 not allowing consistent delete operations) then > that data needs to be handled by the next checkpoint. > The idea is to add a new interface similar to CheckpointListener that > provides a callback when the CheckpointCoordinator timesout a checkpoint > {code:java} > /** > * This interface must be implemented by functions/operations that want to > receive > * a notification if a checkpoint has been {@link > org.apache.flink.runtime.checkpoint.CheckpointCoordinator} > */ > public interface CheckpointTimeoutListener { > /** >* This method is called as a notification if a distributed checkpoint > has been timed out. >* >* @param checkpointId The ID of the checkpoint that has been timed out. >* @throws Exception >*/ > void notifyCheckpointTimeout(long checkpointId) throws Exception; > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5481) Simplify Row creation
[ https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975138#comment-15975138 ] ASF GitHub Bot commented on FLINK-5481: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3127 Looks good, thank you! Merging this... > Simplify Row creation > - > > Key: FLINK-5481 > URL: https://issues.apache.org/jira/browse/FLINK-5481 > Project: Flink > Issue Type: Bug > Components: DataSet API, Table API & SQL >Affects Versions: 1.2.0 >Reporter: Anton Solovev >Assignee: Anton Solovev >Priority: Trivial > > When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first > element of {{data}} to define a type. If first Row in collection has wrong > number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but > GenericType -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3127: [FLINK-5481] Simplify Row creation
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3127 Looks good, thank you! Merging this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6312) Update curator version to 2.12.0
[ https://issues.apache.org/jira/browse/FLINK-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975121#comment-15975121 ] ASF GitHub Bot commented on FLINK-6312: --- Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3727 Sure. Seems like it will take a little long time but i'll try my best :) > Update curator version to 2.12.0 > > > Key: FLINK-6312 > URL: https://issues.apache.org/jira/browse/FLINK-6312 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Tao Wang >Assignee: Tao Wang > > As there's a Major bug(https://issues.apache.org/jira/browse/CURATOR-344) in > curator release used by flink, we need to update the release to 2.12.0 to > avoid potential block in flink. (flink use recipes in checkpoint coordinator > and we have already occurred problem in zookeeper failover when we're trying > to fix https://issues.apache.org/jira/browse/FLINK-6174) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3727: [FLINK-6312]update curator version to 2.12.0 to avoid pot...
Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3727 Sure. Seems like it will take a little long time but i'll try my best :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-6149) add additional flink logical relation nodes
[ https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmytro Shkvyra closed FLINK-6149. - Resolution: Fixed Ok [~ykt836], I will close it. Thanks for clarification. > add additional flink logical relation nodes > --- > > Key: FLINK-6149 > URL: https://issues.apache.org/jira/browse/FLINK-6149 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Kurt Young > Fix For: 1.3.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6295) use LoadingCache instead of WeakHashMap to lower latency
[ https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975118#comment-15975118 ] ASF GitHub Bot commented on FLINK-6295: --- Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 That means every time EGHolder received a request, it will check if the job status in request is suspended or not, right? This will make cache in EGHolder unmeaningful. > use LoadingCache instead of WeakHashMap to lower latency > > > Key: FLINK-6295 > URL: https://issues.apache.org/jira/browse/FLINK-6295 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Tao Wang >Assignee: Tao Wang > > Now in ExecutionGraphHolder, which is used in many handlers, we use a > WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage > collection. > The latency is too high when JVM do GC rarely, which will make status of jobs > or its tasks unmatched with the real ones. > LoadingCache is a common used cache implementation from guava lib, we can use > its time based eviction to lower latency of status update. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...
Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3709 That means every time EGHolder received a request, it will check if the job status in request is suspended or not, right? This will make cache in EGHolder unmeaningful. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5256) Extend DataSetSingleRowJoin to support Left and Right joins
[ https://issues.apache.org/jira/browse/FLINK-5256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975114#comment-15975114 ] ASF GitHub Bot commented on FLINK-5256: --- Github user DmytroShkvyra commented on the issue: https://github.com/apache/flink/pull/3673 Hi @fhueske could you review this variant. I had to removed tests with non-equality predicates (https://issues.apache.org/jira/browse/FLINK-5520) > Extend DataSetSingleRowJoin to support Left and Right joins > --- > > Key: FLINK-5256 > URL: https://issues.apache.org/jira/browse/FLINK-5256 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Fabian Hueske >Assignee: Dmytro Shkvyra > > The {{DataSetSingleRowJoin}} is a broadcast-map join that supports arbitrary > inner joins where one input is a single row. > I found that Calcite translates certain subqueries into non-equi left and > right joins with single input. These cases can be handled if the > {{DataSetSingleRowJoin}} is extended to support outer joins on the > non-single-row input, i.e., left joins if the right side is single input and > vice versa. -- This message was sent by Atlassian JIRA (v6.3.15#6346)