[GitHub] flink issue #3736: [Flink-6013][metrics] Add Datadog HTTP metrics reporter

2017-04-19 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/3736
  
@zentol ready for another round. The build fails because there are a few 
unrelated flake tests timing out in travis build


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKinesisConsu...

2017-04-19 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3738
  
@tzulitai  You are very welcome . It is my pleasure ~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976114#comment-15976114
 ] 

ASF GitHub Bot commented on FLINK-6311:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3738
  
@tzulitai  You are very welcome . It is my pleasure ~


> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976110#comment-15976110
 ] 

ASF GitHub Bot commented on FLINK-6311:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3738
  
Thanks @zhangminglei. LGTM!

Merging to {{master}} and {{release-1.2}} (will merge a bit later today ;) 
) ..


> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKinesisConsu...

2017-04-19 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3738
  
Thanks @zhangminglei. LGTM!

Merging to {{master}} and {{release-1.2}} (will merge a bit later today ;) 
) ..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKinesisConsu...

2017-04-19 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3738
  
@tzulitai Hi, I have updated the code. Please check it out. Thanks and 
appreciate it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976106#comment-15976106
 ] 

ASF GitHub Bot commented on FLINK-6311:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3738
  
@tzulitai Hi, I have updated the code. Please check it out. Thanks and 
appreciate it.


> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6264) Kafka consumer fails if can't find leader for partition

2017-04-19 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976105#comment-15976105
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6264:


[~autoaim800] I think the exception occurs when there are some partitions whose 
leader cannot be found, so it isn't the case that you described.

I think what [~gyfora] is suggesting is that those partitions should be retried 
instead of failing? Gyula, could you clarify?

> Kafka consumer fails if can't find leader for partition
> ---
>
> Key: FLINK-6264
> URL: https://issues.apache.org/jira/browse/FLINK-6264
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>
> We have observed the following error many times when brokers failed/were 
> restarted:
> java.lang.RuntimeException: Unable to find a leader for partitions: 
> [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, 
> KafkaPartitionHandle=[mytopic,10], offset=-1]
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6303) Documentation support build in docker on OSX

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976031#comment-15976031
 ] 

ASF GitHub Bot commented on FLINK-6303:
---

Github user mtunique commented on the issue:

https://github.com/apache/flink/pull/3719
  
I will follow the branch.


> Documentation support build in docker on OSX
> 
>
> Key: FLINK-6303
> URL: https://issues.apache.org/jira/browse/FLINK-6303
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Tao Meng
>Assignee: Tao Meng
>Priority: Trivial
>
> Now docker only support linux. Because {{-v 
> "/home/$\{USER_NAME}:/home/$\{USER_NAME}"}} only support linux. We need 
> change {{/home/$\{USER_NAME\}}} to {{$\{HOME\}}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3719: [FLINK-6303] Documentation support build in docker on OSX

2017-04-19 Thread mtunique
Github user mtunique commented on the issue:

https://github.com/apache/flink/pull/3719
  
I will follow the branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6264) Kafka consumer fails if can't find leader for partition

2017-04-19 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976012#comment-15976012
 ] 

mingleizhang commented on FLINK-6264:
-

Hi, I would not think it is a bug when brokers failer and restarted. That is 
because {code}unassignedPartitions.size() > 0{code} and then you got this 
exception. [~tzulitai] Hi, How about your opinion about this issue ?

> Kafka consumer fails if can't find leader for partition
> ---
>
> Key: FLINK-6264
> URL: https://issues.apache.org/jira/browse/FLINK-6264
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>
> We have observed the following error many times when brokers failed/were 
> restarted:
> java.lang.RuntimeException: Unable to find a leader for partitions: 
> [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, 
> KafkaPartitionHandle=[mytopic,10], offset=-1]
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6333) Utilize Bloomfilters in RocksDb

2017-04-19 Thread Fang Yong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976008#comment-15976008
 ] 

Fang Yong commented on FLINK-6333:
--

Now flink uses RocksDb jni 4.11.2, and already support setFilter(new 
BloomFilter()) method in BlockBasedTableConfig. Can bloomfilters be used in 
rocksdb 4.11.2 directly? Are there any other improvements from current version 
to RocksDb 5.2.1+?

> Utilize Bloomfilters in RocksDb
> ---
>
> Key: FLINK-6333
> URL: https://issues.apache.org/jira/browse/FLINK-6333
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> Bloom Filters would speed up RocksDb lookups.
> When we upgrade to RocksDb 5.2.1+, we would be able to do:
> {code}
>   new BlockBasedTableConfig()
>   .setBlockCacheSize(blockCacheSize)
>   .setBlockSize(blockSize)
>   .setFilter(new BloomFilter())
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975987#comment-15975987
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/3743

[FLINK-6228][table] Integrating the OVER windows in the Table API (st…

In this PR I had integrating the OVER windows in the Table API, 
Implementation of the syntax and use examples are as follows:
* Syntax:
```
table
   .overWindows(
(Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
order_by_expression] 
  (preceding  
UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
 [following 
UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
as alias,...[n])
   )
  .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])

```
* examples:
```
// Rows clause
table
   .window(Over partitionBy 'c orderBy 'rowTime  preceding 2.rows as 'w1)
   .select(
 'c,
 'b.count over 'w1 as 'countB,
 'e.sum over 'w1 as 'sumE)

// Range clause
table
   .window(Over partitionBy 'c orderBy 'rowTime preceding 2.milli as 'w1)
   .select(
 'c,
 'b.count over 'w1 as 'countB,
 'e.sum over 'w1 as 'sumE)
```
* More detail Info : 
https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#

NOTE: The documentation of the OVER tableAPI not included in this PR.
- [x] General
  - The pull request references the related JIRA issue 
("[FLINK-6228][table] Integrating the OVER windows in the Table API")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-6228-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3743.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3743


commit 03d4153be93d505cdb47d174e5aafe10eb93a45f
Author: sunjincheng121 
Date:   2017-04-13T09:36:18Z

[FLINK-6228][table] Integrating the OVER windows in the Table API (stream)




> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-19 Thread sunjincheng121
GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/3743

[FLINK-6228][table] Integrating the OVER windows in the Table API (st…

In this PR I had integrating the OVER windows in the Table API, 
Implementation of the syntax and use examples are as follows:
* Syntax:
```
table
   .overWindows(
(Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
order_by_expression] 
  (preceding  
UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
 [following 
UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
as alias,...[n])
   )
  .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])

```
* examples:
```
// Rows clause
table
   .window(Over partitionBy 'c orderBy 'rowTime  preceding 2.rows as 'w1)
   .select(
 'c,
 'b.count over 'w1 as 'countB,
 'e.sum over 'w1 as 'sumE)

// Range clause
table
   .window(Over partitionBy 'c orderBy 'rowTime preceding 2.milli as 'w1)
   .select(
 'c,
 'b.count over 'w1 as 'countB,
 'e.sum over 'w1 as 'sumE)
```
* More detail Info : 
https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#

NOTE: The documentation of the OVER tableAPI not included in this PR.
- [x] General
  - The pull request references the related JIRA issue 
("[FLINK-6228][table] Integrating the OVER windows in the Table API")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-6228-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3743.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3743


commit 03d4153be93d505cdb47d174e5aafe10eb93a45f
Author: sunjincheng121 
Date:   2017-04-13T09:36:18Z

[FLINK-6228][table] Integrating the OVER windows in the Table API (stream)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6120) Implement heartbeat logic between JobManager and ResourceManager

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975934#comment-15975934
 ] 

ASF GitHub Bot commented on FLINK-6120:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3645
  
@tillrohrmann , just a kind reminder for this last heartbeat PR. 


> Implement heartbeat logic between JobManager and ResourceManager
> 
>
> Key: FLINK-6120
> URL: https://issues.apache.org/jira/browse/FLINK-6120
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: zhijiang
>Assignee: zhijiang
>
> It is part of work for Flip-6.
> The HeartbeatManager is mainly used for monitoring heartbeat target and 
> reporting payloads.
> For {{ResourceManager}} side, it would trigger monitoring the 
> {{HeartbeatTarget}} when receive registration from {{JobManager}}, and 
> schedule a task to {{requestHeartbeat}} at interval time. If not receive 
> heartbeat response within duration time, the {{HeartbeatListener}} will 
> notify heartbeat timeout, then the {{ResourceManager}} should remove the 
> internal registered {{JobManager}}.
> For {{JobManager}} side, it would trigger monitoring the {{HeartbeatTarget}} 
> when receive registration acknowledgement from {{ResourceManager}}. An it 
> will also be notified heartbeat timeout if not receive heartbeat request from 
> {{ResourceManager}} within duration time.
> The current implementation will not interact payloads via heartbeat, and it 
> can be added if needed future.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3645: [FLINK-6120][Distributed Coordinator]Implement heartbeat ...

2017-04-19 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/3645
  
@tillrohrmann , just a kind reminder for this last heartbeat PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6334) Refactoring UDTF interface

2017-04-19 Thread Shaoxuan Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shaoxuan Wang updated FLINK-6334:
-
Description: The current UDTF leverages the table.join(expression) 
interface, which is not a proper interface in terms of semantics. We would like 
to refactor this to let UDTF use table.join(table) interface. Very briefly,  
UDTF's apply method will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) 
shall be viewed as join(Table)  (was: UDTF's apply method returns a Table Type, 
so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as join(Table))

> Refactoring UDTF interface
> --
>
> Key: FLINK-6334
> URL: https://issues.apache.org/jira/browse/FLINK-6334
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>
> The current UDTF leverages the table.join(expression) interface, which is not 
> a proper interface in terms of semantics. We would like to refactor this to 
> let UDTF use table.join(table) interface. Very briefly,  UDTF's apply method 
> will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as 
> join(Table)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6334) Refactoring UDTF interface

2017-04-19 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-6334:
-

 Summary: Refactoring UDTF interface
 Key: FLINK-6334
 URL: https://issues.apache.org/jira/browse/FLINK-6334
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Ruidong Li


UDTF's apply method returns a Table Type, so Join(UDTF('a, 'b, ...) as 'c) 
shall be viewed as join(Table)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6333) Utilize Bloomfilters in RocksDb

2017-04-19 Thread Ted Yu (JIRA)
Ted Yu created FLINK-6333:
-

 Summary: Utilize Bloomfilters in RocksDb
 Key: FLINK-6333
 URL: https://issues.apache.org/jira/browse/FLINK-6333
 Project: Flink
  Issue Type: Improvement
Reporter: Ted Yu


Bloom Filters would speed up RocksDb lookups.

When we upgrade to RocksDb 5.2.1+, we would be able to do:
{code}
  new BlockBasedTableConfig()
  .setBlockCacheSize(blockCacheSize)
  .setBlockSize(blockSize)
  .setFilter(new BloomFilter())
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6295) use LoadingCache instead of WeakHashMap to lower latency

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975860#comment-15975860
 ] 

ASF GitHub Bot commented on FLINK-6295:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
@zentol How do we know if a job requested is supended or not, as the status 
of jobs in backend is alway changing?


> use LoadingCache instead of WeakHashMap to lower latency
> 
>
> Key: FLINK-6295
> URL: https://issues.apache.org/jira/browse/FLINK-6295
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> Now in ExecutionGraphHolder, which is used in many handlers, we use a 
> WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage 
> collection.
> The latency is too high when JVM do GC rarely, which will make status of jobs 
> or its tasks unmatched with the real ones.
> LoadingCache is a common used cache implementation from guava lib, we can use 
> its time based eviction to lower latency of status update.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-19 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
@zentol How do we know if a job requested is supended or not, as the status 
of jobs in backend is alway changing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6327) Bug in CommonCalc's estimateRowCount() method

2017-04-19 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-6327.
-
   Resolution: Fixed
Fix Version/s: 1.3.0

> Bug in CommonCalc's estimateRowCount() method
> -
>
> Key: FLINK-6327
> URL: https://issues.apache.org/jira/browse/FLINK-6327
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
> Fix For: 1.3.0
>
>
> {{(rowCnt * 0.75).min(1.0)}} should be changed to {{(rowCnt * 0.75).max(1.0)}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6327) Bug in CommonCalc's estimateRowCount() method

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975845#comment-15975845
 ] 

ASF GitHub Bot commented on FLINK-6327:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3740


> Bug in CommonCalc's estimateRowCount() method
> -
>
> Key: FLINK-6327
> URL: https://issues.apache.org/jira/browse/FLINK-6327
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
> Fix For: 1.3.0
>
>
> {{(rowCnt * 0.75).min(1.0)}} should be changed to {{(rowCnt * 0.75).max(1.0)}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3740: [FLINK-6327] [table] Bug in CommonCalc's estimateR...

2017-04-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3740


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6327) Bug in CommonCalc's estimateRowCount() method

2017-04-19 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-6327:
--
Component/s: Table API & SQL

> Bug in CommonCalc's estimateRowCount() method
> -
>
> Key: FLINK-6327
> URL: https://issues.apache.org/jira/browse/FLINK-6327
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>
> {{(rowCnt * 0.75).min(1.0)}} should be changed to {{(rowCnt * 0.75).max(1.0)}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112344921
  
--- Diff: flink-metrics/flink-metrics-datadog/pom.xml ---
@@ -0,0 +1,79 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+org.apache.flink
+flink-metrics-datadog
+1.3-SNAPSHOT
+
--- End diff --

okhttp and okio are self contained, and they don't have any other 
dependencies.

can you please elaborate more "you should create a fat-jar using the 
maven-shade-plugin"? I have never done mvn jar shading before...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112344622
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.datadog.utils.TimestampUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract metric of Datadog for serialization
+ * */
+public abstract class DMetric {
+   private final String metric; // Metric name
--- End diff --

yes, exactly



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112344522
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.datadog.utils.SerializationUtils;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, 
CharacterFilter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
+   public static final String TAGS = "tags";
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   final String name = group.getMetricIdentifier(metricName, this);
--- End diff --

make sense. Refactored as you suggested.

btw, what's CharacterFilter#filterCharacters() for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112344359
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.datadog.utils.SerializationUtils;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, 
CharacterFilter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
+   public static final String TAGS = "tags";
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   final String name = group.getMetricIdentifier(metricName, this);
+
+   List tags = new ArrayList<>(configTags);
+   tags.addAll(getTagsFromMetricGroup(group));
+
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   Counter c = (Counter) metric;
+   counters.put(c, new DCounter(c, name, tags));
+   } else if (metric instanceof Gauge) {
+   Gauge g = (Gauge) metric;
+   gauges.put(g, new DGauge(g, name, tags));
+   } else if(metric instanceof Meter) {
+   Meter m = (Meter) metric;
+   // Only consider rate
+   meters.put(m, new DMeter(m, name, tags));
+   } else if (metric instanceof Histogram) {
+   LOGGER.warn("Cannot add {} because Datadog HTTP 
API doesn't support Histogram", metricName);
+   } else {
+   LOGGER.warn("Cannot add unknown metric type {}. 
This indicates that the reporter " +
+   "does not support this metric type.", 
metric.getClass().getName());
+   }
+   }
+   }
+
+   @Override
+   public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   counters.remove(metric);
+   } else if (metric instanceof Gauge) {
+   gauges.remove(metric);
+   } else if (metric instanceof Meter) {
+   meters.remove(metric);
+   } else if (metric 

[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112344296
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/utils/SerializationUtils.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class SerializationUtils {
--- End diff --

moved to DatadogHttpClient


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112344276
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/utils/TimestampUtils.java
 ---
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog.utils;
+
+public class TimestampUtils {
--- End diff --

moved to DMetric


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112342781
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.datadog.utils.SerializationUtils;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, 
CharacterFilter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
+   public static final String TAGS = "tags";
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   final String name = group.getMetricIdentifier(metricName, this);
+
+   List tags = new ArrayList<>(configTags);
+   tags.addAll(getTagsFromMetricGroup(group));
+
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   Counter c = (Counter) metric;
+   counters.put(c, new DCounter(c, name, tags));
+   } else if (metric instanceof Gauge) {
+   Gauge g = (Gauge) metric;
+   gauges.put(g, new DGauge(g, name, tags));
+   } else if(metric instanceof Meter) {
+   Meter m = (Meter) metric;
+   // Only consider rate
+   meters.put(m, new DMeter(m, name, tags));
+   } else if (metric instanceof Histogram) {
+   LOGGER.warn("Cannot add {} because Datadog HTTP 
API doesn't support Histogram", metricName);
+   } else {
+   LOGGER.warn("Cannot add unknown metric type {}. 
This indicates that the reporter " +
+   "does not support this metric type.", 
metric.getClass().getName());
+   }
+   }
+   }
+
+   @Override
+   public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   counters.remove(metric);
+   } else if (metric instanceof Gauge) {
+   gauges.remove(metric);
+   } else if (metric instanceof Meter) {
+   meters.remove(metric);
+   } else if (metric 

[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112341874
  
--- Diff: flink-metrics/flink-metrics-datadog/pom.xml ---
@@ -0,0 +1,79 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+org.apache.flink
+flink-metrics-datadog
+1.3-SNAPSHOT
+
+   
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   provided
+   
+
+   
+   com.squareup.okhttp3
+   okhttp
+   3.6.0
+   
+
+   
+   com.squareup.okio
+   okio
+   1.11.0
+   
+
+   
+   com.fasterxml.jackson.core
+   jackson-databind
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112341697
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.RequestBody;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Http client talking to Datadog
+ * */
+public class DatadogHttpClient{
+   private static final String SERIES_URL_FORMAT = 
"https://app.datadoghq.com/api/v1/series?api_key=%s;;
+   private static final String VALIDATE_URL_FORMAT = 
"https://app.datadoghq.com/api/v1/validate?api_key=%s;;
+   private static final MediaType MEDIA_TYPE = 
MediaType.parse("application/json; charset=utf-8");
+   private static final int TIMEOUT = 5;
+
+   private final String seriesUrl;
+   private final String validateUrl;
+   private final OkHttpClient client;
+   private final String apiKey;
+
+   public DatadogHttpClient(String dgApiKey) {
+   if(dgApiKey == null || dgApiKey.isEmpty()) {
+   throw new IllegalArgumentException(
+   "Invalid API key:" + dgApiKey);
+   }
+   apiKey = dgApiKey;
+   client = new OkHttpClient.Builder()
+   .connectTimeout(TIMEOUT, TimeUnit.SECONDS)
+   .writeTimeout(TIMEOUT, TimeUnit.SECONDS)
+   .readTimeout(TIMEOUT, TimeUnit.SECONDS)
+   .build();
+
+   seriesUrl = String.format(SERIES_URL_FORMAT, apiKey);
+   validateUrl = String.format(VALIDATE_URL_FORMAT, apiKey);
+   validateApiKey();
+   }
+
+   private void validateApiKey() {
+   Request r = new 
Request.Builder().url(validateUrl).get().build();
+
+   try {
+   Response response = client.newCall(r).execute();
+   if(!response.isSuccessful()) {
--- End diff --

I can't think of any case it fails and not because invalid key. 

If network or Datadog endpoint is down, the response will just timeout, and 
the code path goes to catch clause.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112341399
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.RequestBody;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Http client talking to Datadog
+ * */
+public class DatadogHttpClient{
+   private static final String SERIES_URL_FORMAT = 
"https://app.datadoghq.com/api/v1/series?api_key=%s;;
+   private static final String VALIDATE_URL_FORMAT = 
"https://app.datadoghq.com/api/v1/validate?api_key=%s;;
+   private static final MediaType MEDIA_TYPE = 
MediaType.parse("application/json; charset=utf-8");
+   private static final int TIMEOUT = 5;
+
+   private final String seriesUrl;
+   private final String validateUrl;
+   private final OkHttpClient client;
+   private final String apiKey;
+
+   public DatadogHttpClient(String dgApiKey) {
+   if(dgApiKey == null || dgApiKey.isEmpty()) {
+   throw new IllegalArgumentException(
+   "Invalid API key:" + dgApiKey);
+   }
+   apiKey = dgApiKey;
+   client = new OkHttpClient.Builder()
+   .connectTimeout(TIMEOUT, TimeUnit.SECONDS)
+   .writeTimeout(TIMEOUT, TimeUnit.SECONDS)
+   .readTimeout(TIMEOUT, TimeUnit.SECONDS)
+   .build();
+
+   seriesUrl = String.format(SERIES_URL_FORMAT, apiKey);
+   validateUrl = String.format(VALIDATE_URL_FORMAT, apiKey);
+   validateApiKey();
+   }
+
+   private void validateApiKey() {
+   Request r = new 
Request.Builder().url(validateUrl).get().build();
+
+   try {
+   Response response = client.newCall(r).execute();
+   if(!response.isSuccessful()) {
--- End diff --

Make sense. To broaden failure scenarios a bit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112341238
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.datadog.utils.TimestampUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract metric of Datadog for serialization
+ * */
+public abstract class DMetric {
+   private final String metric; // Metric name
+   private final MetricType type;
+   private final List tags;
+
+   public DMetric(MetricType metricType, String metric, List tags) 
{
+   this.type = metricType;
+   this.metric = metric;
+   this.tags = tags;
+   }
+
+   public MetricType getType() {
+   return type;
--- End diff --

That's the python api, which already encapsulate metric types. I'm using 
the HTTP API, and it's here 
http://docs.datadoghq.com/api/?lang=console#metrics-post


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112340825
  
--- Diff: docs/monitoring/metrics.md ---
@@ -436,6 +436,35 @@ metrics.reporter.stsd.port: 8125
 
 {% endhighlight %}
 
+### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)
+
+In order to use this reporter you must copy 
`/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Parameters:
+
+- `apikey` - the Datadog API key
+- `tags` - (optional) the global tags that will be applied to metrics when 
sending to Datadog. Tags should be separated by comma only
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: dghttp
+metrics.reporter.dghttp.class: 
org.apache.flink.metrics.datadog.DatadogHttpReporter
+metrics.reporter.dghttp.apikey: xxx
+metrics.reporter.dghttp.tags: myflinkapp,prod
+
+// , , , , , 
 will be sent to Datadog as tags
+metrics.scope.jm: .jobmanager
--- End diff --

simplified it and moved out of code highlight.

I believe it's necessary to explain to users how to find and filter tags in 
Datadog, in order to ramp them up faster and remove any potential friction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112340582
  
--- Diff: docs/monitoring/metrics.md ---
@@ -436,6 +436,35 @@ metrics.reporter.stsd.port: 8125
 
 {% endhighlight %}
 
+### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)
+
+In order to use this reporter you must copy 
`/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Parameters:
+
+- `apikey` - the Datadog API key
+- `tags` - (optional) the global tags that will be applied to metrics when 
sending to Datadog. Tags should be separated by comma only
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: dghttp
+metrics.reporter.dghttp.class: 
org.apache.flink.metrics.datadog.DatadogHttpReporter
+metrics.reporter.dghttp.apikey: xxx
+metrics.reporter.dghttp.tags: myflinkapp,prod
+
+// , , , , , 
 will be sent to Datadog as tags
+metrics.scope.jm: .jobmanager
--- End diff --

removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112340440
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.datadog.utils.SerializationUtils;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, 
CharacterFilter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
+   public static final String TAGS = "tags";
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   final String name = group.getMetricIdentifier(metricName, this);
+
+   List tags = new ArrayList<>(configTags);
+   tags.addAll(getTagsFromMetricGroup(group));
+
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   Counter c = (Counter) metric;
+   counters.put(c, new DCounter(c, name, tags));
+   } else if (metric instanceof Gauge) {
+   Gauge g = (Gauge) metric;
+   gauges.put(g, new DGauge(g, name, tags));
+   } else if(metric instanceof Meter) {
+   Meter m = (Meter) metric;
+   // Only consider rate
+   meters.put(m, new DMeter(m, name, tags));
+   } else if (metric instanceof Histogram) {
+   LOGGER.warn("Cannot add {} because Datadog HTTP 
API doesn't support Histogram", metricName);
+   } else {
+   LOGGER.warn("Cannot add unknown metric type {}. 
This indicates that the reporter " +
+   "does not support this metric type.", 
metric.getClass().getName());
+   }
+   }
+   }
+
+   @Override
+   public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   counters.remove(metric);
+   } else if (metric instanceof Gauge) {
+   gauges.remove(metric);
+   } else if (metric instanceof Meter) {
+   meters.remove(metric);
+   } else if (metric 

[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112340350
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpReporterTests.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class DatadogHttpReporterTests {
+   private static DatadogHttpReporter reporter;
+
+   @BeforeClass
--- End diff --

sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112340221
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpReporterTests.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class DatadogHttpReporterTests {
+   private static DatadogHttpReporter reporter;
+
+   @BeforeClass
+   public static void init() {
+   reporter = new DatadogHttpReporter();
+   }
+
+   @Test
+   public void testFilterChars() {
+   assertEquals("", reporter.filterCharacters(""));
+assertEquals("abc", reporter.filterCharacters("abc"));
--- End diff --

hmm... it's showing fine on my machine. Reindent it any way...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112340046
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.RequestBody;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Http client talking to Datadog
+ * */
+public class DatadogHttpClient{
+   private static final String SERIES_URL_FORMAT = 
"https://app.datadoghq.com/api/v1/series?api_key=%s;;
+   private static final String VALIDATE_URL_FORMAT = 
"https://app.datadoghq.com/api/v1/validate?api_key=%s;;
+   private static final MediaType MEDIA_TYPE = 
MediaType.parse("application/json; charset=utf-8");
+   private static final int TIMEOUT = 5;
+
+   private final String seriesUrl;
+   private final String validateUrl;
+   private final OkHttpClient client;
--- End diff --

You might be looking at shutting down client cache. As the doc there 
mentioned, "Shutdown isn't necessary" :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112339524
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Meter;
+
+import java.util.List;
+
+/**
+ * Mapping of meter between Flink and Datadog
+ *
+ * Only consider rate of the meter, due to Datadog HTTP API's limited 
support of meter
+ * */
+public class DMeter extends DMetric {
+   private final Meter meter;
--- End diff --

I double checked. They are the same using tabs. Maybe a github UI issue if 
you saw something different


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112339291
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.datadog.utils.SerializationUtils;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, 
CharacterFilter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
+   public static final String TAGS = "tags";
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   final String name = group.getMetricIdentifier(metricName, this);
+
+   List tags = new ArrayList<>(configTags);
+   tags.addAll(getTagsFromMetricGroup(group));
+
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   Counter c = (Counter) metric;
+   counters.put(c, new DCounter(c, name, tags));
+   } else if (metric instanceof Gauge) {
+   Gauge g = (Gauge) metric;
+   gauges.put(g, new DGauge(g, name, tags));
--- End diff --

I made DGauge a generic one, rather than tied to any specific type. The 
reason being that I found sometimes the metric in this line doesn't have any 
value, and will throw NullPointException when calling Gauge#getValue().

The part that checks if a Gauge returns a number is in 
DatadogHttpReporter#report()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112338938
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.datadog.utils.SerializationUtils;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, 
CharacterFilter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
+   public static final String TAGS = "tags";
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   final String name = group.getMetricIdentifier(metricName, this);
+
+   List tags = new ArrayList<>(configTags);
+   tags.addAll(getTagsFromMetricGroup(group));
+
+   synchronized (this) {
--- End diff --

sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6332) Upgrade Scala version to 2.11.11

2017-04-19 Thread Ted Yu (JIRA)
Ted Yu created FLINK-6332:
-

 Summary: Upgrade Scala version to 2.11.11
 Key: FLINK-6332
 URL: https://issues.apache.org/jira/browse/FLINK-6332
 Project: Flink
  Issue Type: Improvement
Reporter: Ted Yu
Priority: Minor


Currently scala-2.11 profile uses Scala 2.11.7

2.11.11 is the most recent version.

This issue is to upgrade to Scala 2.11.11



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975645#comment-15975645
 ] 

ASF GitHub Bot commented on FLINK-6311:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3738
  
@StephanEwen @tzulitai Thanks for telling me so useful information. I will 
fix it soon enough. Very appreciate it. 


> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKinesisConsu...

2017-04-19 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/3738
  
@StephanEwen @tzulitai Thanks for telling me so useful information. I will 
fix it soon enough. Very appreciate it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112305453
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/utils/TimestampUtils.java
 ---
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog.utils;
+
+public class TimestampUtils {
--- End diff --

This class seems overkill; could also be a utility method in the 
DatadogHttpClient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112305482
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/utils/SerializationUtils.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class SerializationUtils {
--- End diff --

This class seems overkill; could also be a utility method in the 
DatadogHttpClient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112305118
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.datadog.utils.SerializationUtils;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, 
CharacterFilter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
+   public static final String TAGS = "tags";
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   final String name = group.getMetricIdentifier(metricName, this);
+
+   List tags = new ArrayList<>(configTags);
+   tags.addAll(getTagsFromMetricGroup(group));
+
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   Counter c = (Counter) metric;
+   counters.put(c, new DCounter(c, name, tags));
+   } else if (metric instanceof Gauge) {
+   Gauge g = (Gauge) metric;
+   gauges.put(g, new DGauge(g, name, tags));
+   } else if(metric instanceof Meter) {
+   Meter m = (Meter) metric;
+   // Only consider rate
+   meters.put(m, new DMeter(m, name, tags));
+   } else if (metric instanceof Histogram) {
+   LOGGER.warn("Cannot add {} because Datadog HTTP 
API doesn't support Histogram", metricName);
+   } else {
+   LOGGER.warn("Cannot add unknown metric type {}. 
This indicates that the reporter " +
+   "does not support this metric type.", 
metric.getClass().getName());
+   }
+   }
+   }
+
+   @Override
+   public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   counters.remove(metric);
+   } else if (metric instanceof Gauge) {
+   gauges.remove(metric);
+   } else if (metric instanceof Meter) {
+   meters.remove(metric);
+   } else if (metric 

[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112304098
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.RequestBody;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Http client talking to Datadog
+ * */
+public class DatadogHttpClient{
+   private static final String SERIES_URL_FORMAT = 
"https://app.datadoghq.com/api/v1/series?api_key=%s;;
+   private static final String VALIDATE_URL_FORMAT = 
"https://app.datadoghq.com/api/v1/validate?api_key=%s;;
+   private static final MediaType MEDIA_TYPE = 
MediaType.parse("application/json; charset=utf-8");
+   private static final int TIMEOUT = 5;
+
+   private final String seriesUrl;
+   private final String validateUrl;
+   private final OkHttpClient client;
+   private final String apiKey;
+
+   public DatadogHttpClient(String dgApiKey) {
+   if(dgApiKey == null || dgApiKey.isEmpty()) {
+   throw new IllegalArgumentException(
+   "Invalid API key:" + dgApiKey);
+   }
+   apiKey = dgApiKey;
+   client = new OkHttpClient.Builder()
+   .connectTimeout(TIMEOUT, TimeUnit.SECONDS)
+   .writeTimeout(TIMEOUT, TimeUnit.SECONDS)
+   .readTimeout(TIMEOUT, TimeUnit.SECONDS)
+   .build();
+
+   seriesUrl = String.format(SERIES_URL_FORMAT, apiKey);
+   validateUrl = String.format(VALIDATE_URL_FORMAT, apiKey);
+   validateApiKey();
+   }
+
+   private void validateApiKey() {
+   Request r = new 
Request.Builder().url(validateUrl).get().build();
+
+   try {
+   Response response = client.newCall(r).execute();
+   if(!response.isSuccessful()) {
--- End diff --

The response can fail for reasons other than the key being invalid, no? If 
so, the exception message should state "Failed to validate API key. ".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-19 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975434#comment-15975434
 ] 

Seth Wiesman commented on FLINK-6315:
-

[~StephanEwen] 

So I guess this is the crux of the real issue, eventually consistent file 
systems cannot perform renames but only PUTS. If one checkpoint times out the 
next needs to re-PUT its files into the next valid bucket. 

I had considered something similar to what you are doing with incremental 
checkpointing, if checkpoint 3 arrives before 2 finishes than build on 
checkpoint 1. The reason I backed away from is that in the degenerate case 
where no checkpoint ever completes before the next begins no buckets would be 
committed and no data would ever be deleted from the buffer. Is this a case you 
deal with for incremental checkpointing or is checkpointing that frequently 
considered an anti-pattern and not dealt with? 

Of course the easy solution would be to say that the `EventualyConsistentSink` 
only provides exactly once when `maxConcurrentCheckpoints` is set to 1 but that 
doesn't feel like a satisfactory answer to me. 


> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112303033
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.datadog.utils.TimestampUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract metric of Datadog for serialization
+ * */
+public abstract class DMetric {
+   private final String metric; // Metric name
+   private final MetricType type;
+   private final List tags;
+
+   public DMetric(MetricType metricType, String metric, List tags) 
{
+   this.type = metricType;
+   this.metric = metric;
+   this.tags = tags;
+   }
+
+   public MetricType getType() {
+   return type;
--- End diff --

what is the purpose of this field from DataDog's perspective? I don't see 
it being mentioned under http://docs.datadoghq.com/api/#metrics-post.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975433#comment-15975433
 ] 

ASF GitHub Bot commented on FLINK-6311:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3738#discussion_r112302446
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -408,12 +408,15 @@ public void runFetcher() throws Exception {
 */
public void shutdownFetcher() {
running = false;
-   mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval
-
+   if (mainThread != null) {
+   mainThread.interrupt(); // the main thread may be 
sleeping for the discovery interval
+   }
if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
}
-   shardConsumersExecutor.shutdownNow();
+   if (shardConsumersExecutor != null) {
--- End diff --

Actually I think this is a redundant `null` check, because 
`shardConsumersExecutor` is final.
It should never be null, so this null check might actually be confusing to 
other readers.


> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKines...

2017-04-19 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3738#discussion_r112302446
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -408,12 +408,15 @@ public void runFetcher() throws Exception {
 */
public void shutdownFetcher() {
running = false;
-   mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval
-
+   if (mainThread != null) {
+   mainThread.interrupt(); // the main thread may be 
sleeping for the discovery interval
+   }
if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
}
-   shardConsumersExecutor.shutdownNow();
+   if (shardConsumersExecutor != null) {
--- End diff --

Actually I think this is a redundant `null` check, because 
`shardConsumersExecutor` is final.
It should never be null, so this null check might actually be confusing to 
other readers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112301573
  
--- Diff: flink-metrics/flink-metrics-datadog/pom.xml ---
@@ -0,0 +1,79 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+org.apache.flink
+flink-metrics-datadog
+1.3-SNAPSHOT
+
+   
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   provided
+   
+
+   
+   com.squareup.okhttp3
+   okhttp
+   3.6.0
+   
+
+   
+   com.squareup.okio
+   okio
+   1.11.0
+   
+
+   
+   com.fasterxml.jackson.core
+   jackson-databind
--- End diff --

You should be able to set this to provided.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112301542
  
--- Diff: flink-metrics/flink-metrics-datadog/pom.xml ---
@@ -0,0 +1,79 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+org.apache.flink
+flink-metrics-datadog
+1.3-SNAPSHOT
+
--- End diff --

you should create a fat-jar using the maven-shade-plugin. If the okhttp and 
okio dependencies have significant dependencies as well these dependencies 
should eb shaded as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6008) collection of BlobServer improvements

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975416#comment-15975416
 ] 

ASF GitHub Bot commented on FLINK-6008:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3512
  
Found a race between `BlobCache#deleteAll(JobID)` and 
`BlobCache#getURL(BlobKey)` now that the former is actually being used - this 
needs to be fixed first before merging:

`BlobCache#deleteAll(JobID)` deletes the job directory which is only 
created at the start of `BlobCache#getURL(BlobKey)` which then relies on the 
directory being present.


> collection of BlobServer improvements
> -
>
> Key: FLINK-6008
> URL: https://issues.apache.org/jira/browse/FLINK-6008
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> The following things should be improved around the BlobServer/BlobCache:
> * update config uptions with non-deprecated ones, e.g. 
> {{high-availability.cluster-id}} and {{high-availability.storageDir}}
> * promote {{BlobStore#deleteAll(JobID)}} to the {{BlobService}}
> * extend the {{BlobService}} to work with {{NAME_ADDRESSABLE}} blobs 
> (prepares FLINK-4399]
> * remove {{NAME_ADDRESSABLE}} blobs after job/task termination
> * do not fail the {{BlobServer}} when a delete operation fails
> * code style, like using {{Preconditions.checkArgument}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3512: [FLINK-6008] collection of BlobServer improvements

2017-04-19 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3512
  
Found a race between `BlobCache#deleteAll(JobID)` and 
`BlobCache#getURL(BlobKey)` now that the former is actually being used - this 
needs to be fixed first before merging:

`BlobCache#deleteAll(JobID)` deletes the job directory which is only 
created at the start of `BlobCache#getURL(BlobKey)` which then relies on the 
directory being present.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975415#comment-15975415
 ] 

ASF GitHub Bot commented on FLINK-6311:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3738
  
LGTM after Stephan's comment on making `mainThread` variable `volatile` is 
addressed.
Could you do that @zhangminglei?
Once updated I'll proceed to merge this, thanks :)


> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKinesisConsu...

2017-04-19 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3738
  
LGTM after Stephan's comment on making `mainThread` variable `volatile` is 
addressed.
Could you do that @zhangminglei?
Once updated I'll proceed to merge this, thanks :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112300620
  
--- Diff: docs/monitoring/metrics.md ---
@@ -436,6 +436,35 @@ metrics.reporter.stsd.port: 8125
 
 {% endhighlight %}
 
+### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)
+
+In order to use this reporter you must copy 
`/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Parameters:
+
+- `apikey` - the Datadog API key
+- `tags` - (optional) the global tags that will be applied to metrics when 
sending to Datadog. Tags should be separated by comma only
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: dghttp
+metrics.reporter.dghttp.class: 
org.apache.flink.metrics.datadog.DatadogHttpReporter
+metrics.reporter.dghttp.apikey: xxx
+metrics.reporter.dghttp.tags: myflinkapp,prod
+
+// , , , , , 
 will be sent to Datadog as tags
+metrics.scope.jm: .jobmanager
--- End diff --

this section seems unnecessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112300266
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.datadog.utils.SerializationUtils;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, 
CharacterFilter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
+   public static final String TAGS = "tags";
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   final String name = group.getMetricIdentifier(metricName, this);
+
+   List tags = new ArrayList<>(configTags);
+   tags.addAll(getTagsFromMetricGroup(group));
+
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   Counter c = (Counter) metric;
+   counters.put(c, new DCounter(c, name, tags));
+   } else if (metric instanceof Gauge) {
+   Gauge g = (Gauge) metric;
+   gauges.put(g, new DGauge(g, name, tags));
+   } else if(metric instanceof Meter) {
+   Meter m = (Meter) metric;
+   // Only consider rate
+   meters.put(m, new DMeter(m, name, tags));
+   } else if (metric instanceof Histogram) {
+   LOGGER.warn("Cannot add {} because Datadog HTTP 
API doesn't support Histogram", metricName);
+   } else {
+   LOGGER.warn("Cannot add unknown metric type {}. 
This indicates that the reporter " +
+   "does not support this metric type.", 
metric.getClass().getName());
+   }
+   }
+   }
+
+   @Override
+   public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   counters.remove(metric);
+   } else if (metric instanceof Gauge) {
+   gauges.remove(metric);
+   } else if (metric instanceof Meter) {
+   meters.remove(metric);
+   } else if (metric 

[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112300022
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpReporterTests.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class DatadogHttpReporterTests {
+   private static DatadogHttpReporter reporter;
+
+   @BeforeClass
--- End diff --

if there's only a single test we don't need a @BeforeClass method nor a 
static field; just introduce a local variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112299684
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpReporterTests.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class DatadogHttpReporterTests {
+   private static DatadogHttpReporter reporter;
+
+   @BeforeClass
+   public static void init() {
+   reporter = new DatadogHttpReporter();
+   }
+
+   @Test
+   public void testFilterChars() {
+   assertEquals("", reporter.filterCharacters(""));
+assertEquals("abc", reporter.filterCharacters("abc"));
--- End diff --

missing indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112299390
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.RequestBody;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Http client talking to Datadog
+ * */
+public class DatadogHttpClient{
+   private static final String SERIES_URL_FORMAT = 
"https://app.datadoghq.com/api/v1/series?api_key=%s;;
+   private static final String VALIDATE_URL_FORMAT = 
"https://app.datadoghq.com/api/v1/validate?api_key=%s;;
+   private static final MediaType MEDIA_TYPE = 
MediaType.parse("application/json; charset=utf-8");
+   private static final int TIMEOUT = 5;
+
+   private final String seriesUrl;
+   private final String validateUrl;
+   private final OkHttpClient client;
--- End diff --

you may want to add a close() method that shuts down the OkHttpClient as 
described in 
https://github.com/square/okhttp/blob/master/okhttp/src/main/java/okhttp3/OkHttpClient.java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112298655
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Meter;
+
+import java.util.List;
+
+/**
+ * Mapping of meter between Flink and Datadog
+ *
+ * Only consider rate of the meter, due to Datadog HTTP API's limited 
support of meter
+ * */
+public class DMeter extends DMetric {
+   private final Meter meter;
--- End diff --

The indendation in this class appears to be different than in others.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112298482
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.datadog.utils.SerializationUtils;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, 
CharacterFilter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
+   public static final String TAGS = "tags";
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   final String name = group.getMetricIdentifier(metricName, this);
+
+   List tags = new ArrayList<>(configTags);
+   tags.addAll(getTagsFromMetricGroup(group));
+
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   Counter c = (Counter) metric;
+   counters.put(c, new DCounter(c, name, tags));
+   } else if (metric instanceof Gauge) {
+   Gauge g = (Gauge) metric;
+   gauges.put(g, new DGauge(g, name, tags));
+   } else if(metric instanceof Meter) {
+   Meter m = (Meter) metric;
+   // Only consider rate
+   meters.put(m, new DMeter(m, name, tags));
+   } else if (metric instanceof Histogram) {
+   LOGGER.warn("Cannot add {} because Datadog HTTP 
API doesn't support Histogram", metricName);
+   } else {
+   LOGGER.warn("Cannot add unknown metric type {}. 
This indicates that the reporter " +
+   "does not support this metric type.", 
metric.getClass().getName());
+   }
+   }
+   }
+
+   @Override
+   public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   counters.remove(metric);
+   } else if (metric instanceof Gauge) {
+   gauges.remove(metric);
+   } else if (metric instanceof Meter) {
+   meters.remove(metric);
+   } else if (metric 

[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112298348
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.datadog.utils.SerializationUtils;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, 
CharacterFilter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
+   public static final String TAGS = "tags";
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   final String name = group.getMetricIdentifier(metricName, this);
+
+   List tags = new ArrayList<>(configTags);
+   tags.addAll(getTagsFromMetricGroup(group));
+
+   synchronized (this) {
+   if (metric instanceof Counter) {
+   Counter c = (Counter) metric;
+   counters.put(c, new DCounter(c, name, tags));
+   } else if (metric instanceof Gauge) {
+   Gauge g = (Gauge) metric;
+   gauges.put(g, new DGauge(g, name, tags));
--- End diff --

You will run into `ClassCastExceptions` here. You have to check whether the 
Gauge returns a number before creating the `DGauge`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112298056
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.datadog.utils.SerializationUtils;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, 
CharacterFilter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
+   public static final String TAGS = "tags";
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   final String name = group.getMetricIdentifier(metricName, this);
+
+   List tags = new ArrayList<>(configTags);
+   tags.addAll(getTagsFromMetricGroup(group));
+
+   synchronized (this) {
--- End diff --

No need to synchronize since you use a `ConcurrentHashMap`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112297785
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.datadog.utils.SerializationUtils;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, 
CharacterFilter, Scheduled {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(DatadogHttpReporter.class);
+
+   // Both Flink's Gauge and Meter values are taken as gauge in Datadog
+   private final Map gauges = new ConcurrentHashMap<>();
+   private final Map counters = new 
ConcurrentHashMap<>();
+   private final Map meters = new ConcurrentHashMap<>();
+
+   private DatadogHttpClient client;
+   private List configTags;
+
+   public static final String API_KEY = "apikey";
+   public static final String TAGS = "tags";
+
+   @Override
+   public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
+   final String name = group.getMetricIdentifier(metricName, this);
--- End diff --

Since you don't filter characters anyway you can use 
`MetricGroup#getMetricIdentifier(String name)`. Then you also don't need to 
imp0lement `CharacterFilter`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112296701
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.datadog.utils.TimestampUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract metric of Datadog for serialization
+ * */
+public abstract class DMetric {
+   private final String metric; // Metric name
--- End diff --

So this is a requirement by datadog then? Which expects the name field to 
be called "metric"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-19 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975318#comment-15975318
 ] 

Aljoscha Krettek commented on FLINK-6315:
-

In the {{BucketingSink}}, we move "pending" files to "final" when we get a 
checkpoint complete notification. When restoring (from a failure or from a 
savepoint) we also move pending files that were confirmed by a checkpoint but 
have not been moved because we had a failure before receiving the completion 
notification to "final".

TL:DR
Either the notify or a restore does the move from "pending" to "final".

> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3736: [Flink-6013][metrics] Add Datadog HTTP metrics reporter

2017-04-19 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/3736
  
Addressed @zentol and @StephanEwen 's comments. Ready for another round!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-3328) Incorrectly shaded dependencies in flink-runtime

2017-04-19 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975268#comment-15975268
 ] 

Luke Hutchison edited comment on FLINK-3328 at 4/19/17 6:43 PM:


[~StephanEwen] It's only not a problem if you know that in every case, the 
version of every single shared library will be the same between all Flink jars. 
And even if you know that will be the case, it's hardly ideal space-wise, 
because you end up pulling in the same dependencies multiple times for every 
Flink project, until a fat jar is built. In general, there is a good reason for 
that Maven warning, because if there's any way that multiple different versions 
of a dep can occur, you can trigger bugs that are maddeningly hard to trace.

If every Flink jar (like {{flink-runtime}}) depends on one of the core Flink 
jars (e.g. {{flink-core}}), couldn't all the shared dependencies just be put in 
that one jar?



was (Author: lukehutch):
[~StephanEwen] It's only not a problem if you know that in every case, the 
version of every single shared library will be the same between all Flink jars. 
And even if you know that will be the case, it's hardly ideal space-wise, 
because you end up pulling in the same dependencies multiple times for every 
Flink project, until a fat jar is built. In general, there is a good reason for 
that Maven warning, because if there's any way that multiple different versions 
of a dep can occur, you can trigger bugs that are maddeningly hard to trace.

If every Flink jar (like `flink-runtime`) depends on one of the core Flink jars 
(e.g. `flink-core`), couldn't all the shared dependencies just be put in that 
one jar?


> Incorrectly shaded dependencies in flink-runtime
> 
>
> Key: FLINK-3328
> URL: https://issues.apache.org/jira/browse/FLINK-3328
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Robert Metzger
>Priority: Blocker
> Fix For: 1.0.0
>
>
> There are apparently some dependencies shaded into {{flink-runtime}} fat jar 
> that are not relocated. (the flink-runtime jar is now 70 MB)
> From the output of the shading in flink-dist, it looks as if this concerns at 
> least
>   - Zookeeper
>   - slf4j
>   - jline
>   - netty (3.x)
> Possible more.
> {code}
> [WARNING] zookeeper-3.4.6.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 440 
> overlapping classes: 
> [WARNING]   - org.apache.zookeeper.server.NettyServerCnxnFactory
> [WARNING]   - org.apache.jute.compiler.JFile
> [WARNING]   - org.apache.zookeeper.server.SessionTracker$Session
> [WARNING]   - org.apache.zookeeper.server.quorum.AuthFastLeaderElection$1
> [WARNING]   - org.apache.jute.compiler.JLong
> [WARNING]   - org.apache.zookeeper.client.ZooKeeperSaslClient$SaslState
> [WARNING]   - org.apache.zookeeper.server.auth.KerberosName$Rule
> [WARNING]   - org.apache.jute.CsvOutputArchive
> [WARNING]   - org.apache.zookeeper.server.quorum.QuorumPeer
> [WARNING]   - org.apache.zookeeper.ZooKeeper$DataWatchRegistration
> [WARNING]   - 430 more...
> [WARNING] slf4j-api-1.7.7.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 24 
> overlapping classes: 
> [WARNING]   - org.slf4j.spi.MarkerFactoryBinder
> [WARNING]   - org.slf4j.helpers.SubstituteLogger
> [WARNING]   - org.slf4j.helpers.BasicMarker
> [WARNING]   - org.slf4j.helpers.Util
> [WARNING]   - org.slf4j.LoggerFactory
> [WARNING]   - org.slf4j.Marker
> [WARNING]   - org.slf4j.helpers.NamedLoggerBase
> [WARNING]   - org.slf4j.Logger
> [WARNING]   - org.slf4j.spi.LocationAwareLogger
> [WARNING]   - org.slf4j.ILoggerFactory
> [WARNING]   - 14 more...
> [WARNING] jansi-1.4.jar, jline-2.10.4.jar define 23 overlapping classes: 
> [WARNING]   - org.fusesource.jansi.Ansi$Erase
> [WARNING]   - org.fusesource.jansi.Ansi
> [WARNING]   - org.fusesource.jansi.AnsiOutputStream
> [WARNING]   - org.fusesource.jansi.internal.CLibrary
> [WARNING]   - org.fusesource.jansi.Ansi$2
> [WARNING]   - org.fusesource.jansi.WindowsAnsiOutputStream
> [WARNING]   - org.fusesource.jansi.AnsiRenderer$Code
> [WARNING]   - org.fusesource.jansi.AnsiConsole
> [WARNING]   - org.fusesource.jansi.Ansi$Attribute
> [WARNING]   - org.fusesource.jansi.internal.Kernel32
> [WARNING]   - 13 more...
> [WARNING] commons-beanutils-core-1.8.0.jar, commons-collections-3.2.2.jar, 
> commons-beanutils-1.7.0.jar define 10 overlapping classes: 
> [WARNING]   - org.apache.commons.collections.FastHashMap$EntrySet
> [WARNING]   - org.apache.commons.collections.ArrayStack
> [WARNING]   - org.apache.commons.collections.FastHashMap$1
> [WARNING]   - org.apache.commons.collections.FastHashMap$KeySet
> [WARNING]   - org.apache.commons.collections.FastHashMap$CollectionView
> [WARNING]   - 

[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112283099
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/metric/DGauge.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog.metric;
+
+import java.util.List;
+
+/**
+ * Mapping of gauge between Flink and Datadog
+ * */
+public class DGauge extends DMetric {
--- End diff --

yes, see my explanation below


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3328) Incorrectly shaded dependencies in flink-runtime

2017-04-19 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975268#comment-15975268
 ] 

Luke Hutchison commented on FLINK-3328:
---

[~StephanEwen] It's only not a problem if you know that in every case, the 
version of every single shared library will be the same between all Flink jars. 
And even if you know that will be the case, it's hardly ideal space-wise, 
because you end up pulling in the same dependencies multiple times for every 
Flink project, until a fat jar is built. In general, there is a good reason for 
that Maven warning, because if there's any way that multiple different versions 
of a dep can occur, you can trigger bugs that are maddeningly hard to trace.

If every Flink jar (like `flink-runtime`) depends on one of the core Flink jars 
(e.g. `flink-core`), couldn't all the shared dependencies just be put in that 
one jar?


> Incorrectly shaded dependencies in flink-runtime
> 
>
> Key: FLINK-3328
> URL: https://issues.apache.org/jira/browse/FLINK-3328
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Robert Metzger
>Priority: Blocker
> Fix For: 1.0.0
>
>
> There are apparently some dependencies shaded into {{flink-runtime}} fat jar 
> that are not relocated. (the flink-runtime jar is now 70 MB)
> From the output of the shading in flink-dist, it looks as if this concerns at 
> least
>   - Zookeeper
>   - slf4j
>   - jline
>   - netty (3.x)
> Possible more.
> {code}
> [WARNING] zookeeper-3.4.6.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 440 
> overlapping classes: 
> [WARNING]   - org.apache.zookeeper.server.NettyServerCnxnFactory
> [WARNING]   - org.apache.jute.compiler.JFile
> [WARNING]   - org.apache.zookeeper.server.SessionTracker$Session
> [WARNING]   - org.apache.zookeeper.server.quorum.AuthFastLeaderElection$1
> [WARNING]   - org.apache.jute.compiler.JLong
> [WARNING]   - org.apache.zookeeper.client.ZooKeeperSaslClient$SaslState
> [WARNING]   - org.apache.zookeeper.server.auth.KerberosName$Rule
> [WARNING]   - org.apache.jute.CsvOutputArchive
> [WARNING]   - org.apache.zookeeper.server.quorum.QuorumPeer
> [WARNING]   - org.apache.zookeeper.ZooKeeper$DataWatchRegistration
> [WARNING]   - 430 more...
> [WARNING] slf4j-api-1.7.7.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 24 
> overlapping classes: 
> [WARNING]   - org.slf4j.spi.MarkerFactoryBinder
> [WARNING]   - org.slf4j.helpers.SubstituteLogger
> [WARNING]   - org.slf4j.helpers.BasicMarker
> [WARNING]   - org.slf4j.helpers.Util
> [WARNING]   - org.slf4j.LoggerFactory
> [WARNING]   - org.slf4j.Marker
> [WARNING]   - org.slf4j.helpers.NamedLoggerBase
> [WARNING]   - org.slf4j.Logger
> [WARNING]   - org.slf4j.spi.LocationAwareLogger
> [WARNING]   - org.slf4j.ILoggerFactory
> [WARNING]   - 14 more...
> [WARNING] jansi-1.4.jar, jline-2.10.4.jar define 23 overlapping classes: 
> [WARNING]   - org.fusesource.jansi.Ansi$Erase
> [WARNING]   - org.fusesource.jansi.Ansi
> [WARNING]   - org.fusesource.jansi.AnsiOutputStream
> [WARNING]   - org.fusesource.jansi.internal.CLibrary
> [WARNING]   - org.fusesource.jansi.Ansi$2
> [WARNING]   - org.fusesource.jansi.WindowsAnsiOutputStream
> [WARNING]   - org.fusesource.jansi.AnsiRenderer$Code
> [WARNING]   - org.fusesource.jansi.AnsiConsole
> [WARNING]   - org.fusesource.jansi.Ansi$Attribute
> [WARNING]   - org.fusesource.jansi.internal.Kernel32
> [WARNING]   - 13 more...
> [WARNING] commons-beanutils-core-1.8.0.jar, commons-collections-3.2.2.jar, 
> commons-beanutils-1.7.0.jar define 10 overlapping classes: 
> [WARNING]   - org.apache.commons.collections.FastHashMap$EntrySet
> [WARNING]   - org.apache.commons.collections.ArrayStack
> [WARNING]   - org.apache.commons.collections.FastHashMap$1
> [WARNING]   - org.apache.commons.collections.FastHashMap$KeySet
> [WARNING]   - org.apache.commons.collections.FastHashMap$CollectionView
> [WARNING]   - org.apache.commons.collections.BufferUnderflowException
> [WARNING]   - org.apache.commons.collections.Buffer
> [WARNING]   - 
> org.apache.commons.collections.FastHashMap$CollectionView$CollectionViewIterator
> [WARNING]   - org.apache.commons.collections.FastHashMap$Values
> [WARNING]   - org.apache.commons.collections.FastHashMap
> [WARNING] flink-streaming-scala_2.10-1.0-SNAPSHOT.jar, 
> flink-core-1.0-SNAPSHOT.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar, 
> flink-java-1.0-SNAPSHOT.jar, flink-streaming-java_2.10-1.0-SNAPSHOT.jar, 
> flink-scala_2.10-1.0-SNAPSHOT.jar, flink-clients_2.10-1.0-SNAPSHOT.jar, 
> flink-optimizer_2.10-1.0-SNAPSHOT.jar, 
> flink-runtime-web_2.10-1.0-SNAPSHOT.jar define 1690 overlapping classes: 
> [WARNING]   - 
> org.apache.flink.shaded.com.google.common.collect.LinkedListMultimap
> [WARNING] 

[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112282965
  
--- Diff: flink-metrics/flink-metrics-datadog/pom.xml ---
@@ -0,0 +1,85 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+   
+   org.apache.flink
+   flink-metrics
+   1.3-SNAPSHOT
+   ..
+   
+
+org.apache.flink
+flink-metrics-datadog
+1.3-SNAPSHOT
+
+   
+   
+   org.apache.flink
+   flink-metrics-core
+   ${project.version}
+   provided
+   
+
+   
+   com.google.guava
--- End diff --

I removed guava dependency as Stephan proposed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6103) LocalFileSystem rename() uses File.renameTo()

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975260#comment-15975260
 ] 

ASF GitHub Bot commented on FLINK-6103:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3598
  
I would do the following:
  - log nothing
  - Catch the errors that are regular "move failed" exceptions and return 
false.
 - `FileNotFoundException`
 - `DirectoryNotEmptyException`
 - `SecurityException`
  - Let all other `IOExceptions` bubble out



> LocalFileSystem rename() uses File.renameTo()
> -
>
> Key: FLINK-6103
> URL: https://issues.apache.org/jira/browse/FLINK-6103
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>  Labels: filesystem
>
> I've tried to move a directory to another on the LocalFilesystem and it 
> doesn't work (in my case fs is an instance of java.io.UnixFileSystem).
> As for Flink-1840 (there was a PR to fix the issue - 
> https://github.com/apache/flink/pull/578) the problem is that 
> {{File.renameTo()}} is not reliable.
> Indeed, the Javadoc says:
> bq. Renames the file denoted by this abstract pathname. Many aspects of the 
> behavior of this method are inherently platform-dependent: The rename 
> operation might not be able to move a file from one filesystem to another, it 
> might not be atomic, and it might not succeed if a file with the destination 
> abstract pathname already exists. The return value should always be checked 
> to make sure that the rename operation was successful. Note that the 
> java.nio.file.Files class defines the move method to move or rename a file in 
> a platform independent manner



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3598: [FLINK-6103] LocalFileSystem rename() uses File.renameTo(...

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3598
  
I would do the following:
  - log nothing
  - Catch the errors that are regular "move failed" exceptions and return 
false.
 - `FileNotFoundException`
 - `DirectoryNotEmptyException`
 - `SecurityException`
  - Let all other `IOExceptions` bubble out



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-19 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
@WangTaoTheTonic The purpose of the cache is to reduce queries to the 
JobManager; and since the state of the job is available through the 
ExecutionGraph the cache still fulfills its purpose.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6295) use LoadingCache instead of WeakHashMap to lower latency

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975259#comment-15975259
 ] 

ASF GitHub Bot commented on FLINK-6295:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3709
  
@WangTaoTheTonic The purpose of the cache is to reduce queries to the 
JobManager; and since the state of the job is available through the 
ExecutionGraph the cache still fulfills its purpose.


> use LoadingCache instead of WeakHashMap to lower latency
> 
>
> Key: FLINK-6295
> URL: https://issues.apache.org/jira/browse/FLINK-6295
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> Now in ExecutionGraphHolder, which is used in many handlers, we use a 
> WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage 
> collection.
> The latency is too high when JVM do GC rarely, which will make status of jobs 
> or its tasks unmatched with the real ones.
> LoadingCache is a common used cache implementation from guava lib, we can use 
> its time based eviction to lower latency of status update.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112280185
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.datadog.utils.SerializationUtils;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Abbr: dghttp
--- End diff --

This is reminder for flink-conf.yaml when users choose to use this reporter.
```
metrics.reporters: dghttp
```

I'll add more comment here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3736: [Flink-6013][metrics] Add Datadog HTTP metrics rep...

2017-04-19 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3736#discussion_r112279814
  
--- Diff: 
flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.datadog.utils.TimestampUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract metric of Datadog for serialization
+ * */
+public abstract class DMetric {
+   private final String metric; // Metric name
--- End diff --

'metric' is a field name in serialized json, and Jackson takes the name 
here during serialization. Thus, I can't rename it to something else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6103) LocalFileSystem rename() uses File.renameTo()

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975242#comment-15975242
 ] 

ASF GitHub Bot commented on FLINK-6103:
---

Github user fpompermaier commented on the issue:

https://github.com/apache/flink/pull/3598
  
Of course..but how should I handle them? should I catch just one exception? 
What should I log?


> LocalFileSystem rename() uses File.renameTo()
> -
>
> Key: FLINK-6103
> URL: https://issues.apache.org/jira/browse/FLINK-6103
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>  Labels: filesystem
>
> I've tried to move a directory to another on the LocalFilesystem and it 
> doesn't work (in my case fs is an instance of java.io.UnixFileSystem).
> As for Flink-1840 (there was a PR to fix the issue - 
> https://github.com/apache/flink/pull/578) the problem is that 
> {{File.renameTo()}} is not reliable.
> Indeed, the Javadoc says:
> bq. Renames the file denoted by this abstract pathname. Many aspects of the 
> behavior of this method are inherently platform-dependent: The rename 
> operation might not be able to move a file from one filesystem to another, it 
> might not be atomic, and it might not succeed if a file with the destination 
> abstract pathname already exists. The return value should always be checked 
> to make sure that the rename operation was successful. Note that the 
> java.nio.file.Files class defines the move method to move or rename a file in 
> a platform independent manner



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3598: [FLINK-6103] LocalFileSystem rename() uses File.renameTo(...

2017-04-19 Thread fpompermaier
Github user fpompermaier commented on the issue:

https://github.com/apache/flink/pull/3598
  
Of course..but how should I handle them? should I catch just one exception? 
What should I log?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5338) Make printing sinks non-parallel

2017-04-19 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5338.
---
   Resolution: Won't Fix
Fix Version/s: (was: 2.0.0)

> Make printing sinks non-parallel
> 
>
> Key: FLINK-5338
> URL: https://issues.apache.org/jira/browse/FLINK-5338
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Stephan Ewen
>Priority: Minor
>
> This is a suggestion, up for discussion.
> I suggest to make the {{DataStream::print()}} sink function inherently 
> non-parallel, i.e., always executed in a single parallel sink.
> Since printing is only for toy setups anyways, and the performance is usually 
> bound by the stdout stream, non-parallel prints should be sufficient.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5338) Make printing sinks non-parallel

2017-04-19 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975235#comment-15975235
 ] 

Stephan Ewen commented on FLINK-5338:
-

I am okay with closing it...

> Make printing sinks non-parallel
> 
>
> Key: FLINK-5338
> URL: https://issues.apache.org/jira/browse/FLINK-5338
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Stephan Ewen
>Priority: Minor
>
> This is a suggestion, up for discussion.
> I suggest to make the {{DataStream::print()}} sink function inherently 
> non-parallel, i.e., always executed in a single parallel sink.
> Since printing is only for toy setups anyways, and the performance is usually 
> bound by the stdout stream, non-parallel prints should be sufficient.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3328) Incorrectly shaded dependencies in flink-runtime

2017-04-19 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975227#comment-15975227
 ] 

Stephan Ewen commented on FLINK-3328:
-

That is not a problem. Guava and asm must be in each artifact ({{flink-core}}, 
{{flink-runtime}}, etc) to make them self contained.
When assembling the final jar, they all have guava, so you have an overlap of 
the exact same classes.

The alternative is to relocate it to a different namespace in each artifact, 
but then we get guava 10 times into the {{flink-dist}}, and that is not very 
desirable.

> Incorrectly shaded dependencies in flink-runtime
> 
>
> Key: FLINK-3328
> URL: https://issues.apache.org/jira/browse/FLINK-3328
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Robert Metzger
>Priority: Blocker
> Fix For: 1.0.0
>
>
> There are apparently some dependencies shaded into {{flink-runtime}} fat jar 
> that are not relocated. (the flink-runtime jar is now 70 MB)
> From the output of the shading in flink-dist, it looks as if this concerns at 
> least
>   - Zookeeper
>   - slf4j
>   - jline
>   - netty (3.x)
> Possible more.
> {code}
> [WARNING] zookeeper-3.4.6.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 440 
> overlapping classes: 
> [WARNING]   - org.apache.zookeeper.server.NettyServerCnxnFactory
> [WARNING]   - org.apache.jute.compiler.JFile
> [WARNING]   - org.apache.zookeeper.server.SessionTracker$Session
> [WARNING]   - org.apache.zookeeper.server.quorum.AuthFastLeaderElection$1
> [WARNING]   - org.apache.jute.compiler.JLong
> [WARNING]   - org.apache.zookeeper.client.ZooKeeperSaslClient$SaslState
> [WARNING]   - org.apache.zookeeper.server.auth.KerberosName$Rule
> [WARNING]   - org.apache.jute.CsvOutputArchive
> [WARNING]   - org.apache.zookeeper.server.quorum.QuorumPeer
> [WARNING]   - org.apache.zookeeper.ZooKeeper$DataWatchRegistration
> [WARNING]   - 430 more...
> [WARNING] slf4j-api-1.7.7.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar define 24 
> overlapping classes: 
> [WARNING]   - org.slf4j.spi.MarkerFactoryBinder
> [WARNING]   - org.slf4j.helpers.SubstituteLogger
> [WARNING]   - org.slf4j.helpers.BasicMarker
> [WARNING]   - org.slf4j.helpers.Util
> [WARNING]   - org.slf4j.LoggerFactory
> [WARNING]   - org.slf4j.Marker
> [WARNING]   - org.slf4j.helpers.NamedLoggerBase
> [WARNING]   - org.slf4j.Logger
> [WARNING]   - org.slf4j.spi.LocationAwareLogger
> [WARNING]   - org.slf4j.ILoggerFactory
> [WARNING]   - 14 more...
> [WARNING] jansi-1.4.jar, jline-2.10.4.jar define 23 overlapping classes: 
> [WARNING]   - org.fusesource.jansi.Ansi$Erase
> [WARNING]   - org.fusesource.jansi.Ansi
> [WARNING]   - org.fusesource.jansi.AnsiOutputStream
> [WARNING]   - org.fusesource.jansi.internal.CLibrary
> [WARNING]   - org.fusesource.jansi.Ansi$2
> [WARNING]   - org.fusesource.jansi.WindowsAnsiOutputStream
> [WARNING]   - org.fusesource.jansi.AnsiRenderer$Code
> [WARNING]   - org.fusesource.jansi.AnsiConsole
> [WARNING]   - org.fusesource.jansi.Ansi$Attribute
> [WARNING]   - org.fusesource.jansi.internal.Kernel32
> [WARNING]   - 13 more...
> [WARNING] commons-beanutils-core-1.8.0.jar, commons-collections-3.2.2.jar, 
> commons-beanutils-1.7.0.jar define 10 overlapping classes: 
> [WARNING]   - org.apache.commons.collections.FastHashMap$EntrySet
> [WARNING]   - org.apache.commons.collections.ArrayStack
> [WARNING]   - org.apache.commons.collections.FastHashMap$1
> [WARNING]   - org.apache.commons.collections.FastHashMap$KeySet
> [WARNING]   - org.apache.commons.collections.FastHashMap$CollectionView
> [WARNING]   - org.apache.commons.collections.BufferUnderflowException
> [WARNING]   - org.apache.commons.collections.Buffer
> [WARNING]   - 
> org.apache.commons.collections.FastHashMap$CollectionView$CollectionViewIterator
> [WARNING]   - org.apache.commons.collections.FastHashMap$Values
> [WARNING]   - org.apache.commons.collections.FastHashMap
> [WARNING] flink-streaming-scala_2.10-1.0-SNAPSHOT.jar, 
> flink-core-1.0-SNAPSHOT.jar, flink-runtime_2.10-1.0-SNAPSHOT.jar, 
> flink-java-1.0-SNAPSHOT.jar, flink-streaming-java_2.10-1.0-SNAPSHOT.jar, 
> flink-scala_2.10-1.0-SNAPSHOT.jar, flink-clients_2.10-1.0-SNAPSHOT.jar, 
> flink-optimizer_2.10-1.0-SNAPSHOT.jar, 
> flink-runtime-web_2.10-1.0-SNAPSHOT.jar define 1690 overlapping classes: 
> [WARNING]   - 
> org.apache.flink.shaded.com.google.common.collect.LinkedListMultimap
> [WARNING]   - 
> org.apache.flink.shaded.com.google.common.io.ByteSource$AsCharSource
> [WARNING]   - org.apache.flink.shaded.com.google.common.escape.Platform
> [WARNING]   - 
> org.apache.flink.shaded.com.google.common.util.concurrent.Futures$ImmediateFailedCheckedFuture
> [WARNING]   - 
> 

[jira] [Commented] (FLINK-5623) TempBarrier dam has been closed

2017-04-19 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975220#comment-15975220
 ] 

Stephan Ewen commented on FLINK-5623:
-

I think the issue is that the "pipeline breaker" is not properly reset. The 
logic closes it, but then on re-creation re-references it.

The fix could be to add to {{BatchTask}}, line 887 a 
{code}
...
this.tempBarriers[i].close();
this.tempBarriers[i] = null;   // <<= this is the new line
{code}


> TempBarrier dam has been closed
> ---
>
> Key: FLINK-5623
> URL: https://issues.apache.org/jira/browse/FLINK-5623
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.3.0
>Reporter: Greg Hogan
>
> PageRank (see PR from FLINK-4896) results in the following error. Can be 
> reproduced by changing {{AsmTestBase:63}} to {{env = 
> executionEnvironment.createLocalEnvironment();}} then running 
> {{PageRankTest}} (fails for Simple and RMat graph tests, succeeds for 
> Complete graph test).
> {noformat}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>   at 
> org.apache.flink.graph.asm.dataset.AbstractDataSetAnalytic.execute(AbstractDataSetAnalytic.java:55)
>   at org.apache.flink.graph.drivers.PageRank.print(PageRank.java:113)
>   at org.apache.flink.graph.Runner.main(Runner.java:257)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:834)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1076)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1123)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1120)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:905)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:848)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:848)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: An error occurred creating the temp 
> table.
>   at 
> 

[jira] [Commented] (FLINK-6315) Notify on checkpoint timeout

2017-04-19 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975203#comment-15975203
 ] 

Stephan Ewen commented on FLINK-6315:
-

I think you are thinking about it the right way. When checkpoint 2 does not 
happen for whatever reason then checkpoint 3 should be in charge of everything 
since the last successful checkpoint.

I see the problem now: When checkpoint 3 starts, you may not yet know whether 
checkpoint 2 is actually going to complete. To make it more tricky, it may 
actually be that checkpoint 2 fails (due to a timeout) after checkpoint 3 
completes.

In the incremental checkpointing code, we have a similar problem. In that case, 
we can only re-reference a diff if it is part of a completed checkpoint. If for 
example checkpoint 2 is not complete when checkpoint 3 is started, then 
checkpoint 3 builds on checkpoint 1, not on checkpoint 2.

[~aljoscha] How is that handled in the regular bucketing sink?

> Notify on checkpoint timeout 
> -
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>   /**
>* This method is called as a notification if a distributed checkpoint 
> has been timed out.
>*
>* @param checkpointId The ID of the checkpoint that has been timed out.
>* @throws Exception
>*/
>   void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5481) Simplify Row creation

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975138#comment-15975138
 ] 

ASF GitHub Bot commented on FLINK-5481:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3127
  
Looks good, thank you!
Merging this...


> Simplify Row creation
> -
>
> Key: FLINK-5481
> URL: https://issues.apache.org/jira/browse/FLINK-5481
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>Priority: Trivial
>
> When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first 
> element of {{data}} to define a type. If first Row in collection has wrong 
> number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but 
> GenericType



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3127: [FLINK-5481] Simplify Row creation

2017-04-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3127
  
Looks good, thank you!
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6312) Update curator version to 2.12.0

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975121#comment-15975121
 ] 

ASF GitHub Bot commented on FLINK-6312:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3727
  
Sure. Seems like it will take a little long time but i'll try my best :)


> Update curator version to 2.12.0
> 
>
> Key: FLINK-6312
> URL: https://issues.apache.org/jira/browse/FLINK-6312
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> As there's a Major bug(https://issues.apache.org/jira/browse/CURATOR-344) in 
> curator release used by flink, we need to update the release to 2.12.0 to 
> avoid potential block in flink. (flink use recipes in checkpoint coordinator 
> and we have already occurred problem in zookeeper failover when we're trying 
> to fix https://issues.apache.org/jira/browse/FLINK-6174)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3727: [FLINK-6312]update curator version to 2.12.0 to avoid pot...

2017-04-19 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3727
  
Sure. Seems like it will take a little long time but i'll try my best :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-6149) add additional flink logical relation nodes

2017-04-19 Thread Dmytro Shkvyra (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmytro Shkvyra closed FLINK-6149.
-
Resolution: Fixed

Ok [~ykt836], I will close it. Thanks for clarification. 

> add additional flink logical relation nodes
> ---
>
> Key: FLINK-6149
> URL: https://issues.apache.org/jira/browse/FLINK-6149
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
> Fix For: 1.3.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6295) use LoadingCache instead of WeakHashMap to lower latency

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975118#comment-15975118
 ] 

ASF GitHub Bot commented on FLINK-6295:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
That means every time EGHolder received a request, it will check if the job 
status in request is suspended or not, right?  This will make cache in EGHolder 
unmeaningful.


> use LoadingCache instead of WeakHashMap to lower latency
> 
>
> Key: FLINK-6295
> URL: https://issues.apache.org/jira/browse/FLINK-6295
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> Now in ExecutionGraphHolder, which is used in many handlers, we use a 
> WeakHashMap to cache ExecutionGraph(s), which is only sensitive to garbage 
> collection.
> The latency is too high when JVM do GC rarely, which will make status of jobs 
> or its tasks unmatched with the real ones.
> LoadingCache is a common used cache implementation from guava lib, we can use 
> its time based eviction to lower latency of status update.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3709: [FLINK-6295]use LoadingCache instead of WeakHashMap to lo...

2017-04-19 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3709
  
That means every time EGHolder received a request, it will check if the job 
status in request is suspended or not, right?  This will make cache in EGHolder 
unmeaningful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5256) Extend DataSetSingleRowJoin to support Left and Right joins

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15975114#comment-15975114
 ] 

ASF GitHub Bot commented on FLINK-5256:
---

Github user DmytroShkvyra commented on the issue:

https://github.com/apache/flink/pull/3673
  
Hi @fhueske could you review this variant. I had to removed tests with 
non-equality predicates (https://issues.apache.org/jira/browse/FLINK-5520)


> Extend DataSetSingleRowJoin to support Left and Right joins
> ---
>
> Key: FLINK-5256
> URL: https://issues.apache.org/jira/browse/FLINK-5256
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Dmytro Shkvyra
>
> The {{DataSetSingleRowJoin}} is a broadcast-map join that supports arbitrary 
> inner joins where one input is a single row.
> I found that Calcite translates certain subqueries into non-equi left and 
> right joins with single input. These cases can be handled if the  
> {{DataSetSingleRowJoin}} is extended to support outer joins on the 
> non-single-row input, i.e., left joins if the right side is single input and 
> vice versa.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


  1   2   3   4   >