[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525163#comment-15525163
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user delding commented on the issue:

https://github.com/apache/flink/pull/2332
  
I agree it might be an overkill. But in case of having an sink that only 
supports Put/Delete, it would be better to have ordered execution than not, 
after all HBase has this API so there could be some use case that needs order 
guarantee.


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-26 Thread delding
Github user delding commented on the issue:

https://github.com/apache/flink/pull/2332
  
I agree it might be an overkill. But in case of having an sink that only 
supports Put/Delete, it would be better to have ordered execution than not, 
after all HBase has this API so there could be some use case that needs order 
guarantee.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525128#comment-15525128
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2332
  
With in a single row you need guarantee of order of execution? I agree 
Append/Increment or non-idempotent in certain failure cases but there is a 
Nonce generator for that. I should say I have not used that anyway to know the 
exact pros and cons of that. RowMutations atleast it supports only Put/Delete 
so in that angle you can be sure that for now we don't support 
Append/Increments.

>  we might need WriteAheadSink and figure out a way to roll back table to 
the last checkpoint state

This will be tricky. Your hbase table rollback would mean that you may have 
to issue Deletes here so that the previous mutations are hidden.


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-26 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2332
  
With in a single row you need guarantee of order of execution? I agree 
Append/Increment or non-idempotent in certain failure cases but there is a 
Nonce generator for that. I should say I have not used that anyway to know the 
exact pros and cons of that. RowMutations atleast it supports only Put/Delete 
so in that angle you can be sure that for now we don't support 
Append/Increments.

>  we might need WriteAheadSink and figure out a way to roll back table to 
the last checkpoint state

This will be tricky. Your hbase table rollback would mean that you may have 
to issue Deletes here so that the previous mutations are hidden.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525108#comment-15525108
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user delding commented on the issue:

https://github.com/apache/flink/pull/2332
  
Hi @ramkrish86 , I'm thinking replace batch() with mutateRow() because it 
provides atomic ordered mutations for a single row, but it only supports Put 
and Delete which should be fine since only Put and Delete are idempotent, this 
way we can implement Put and Delete without using WriteAheadSink (in case of 
deterministic processing). What do you think? 

Regarding Append and Delete, as HBase doesn't support distributed 
transaction across multiple rows, we might need WriteAheadSink and figure out a 
way to roll back table to the last checkpoint state. I'm thinking about this 
right now.

So it might make sense to have two HBaseSinks, one for Put/Delet, the other 
for Append/Delete and non-deterministic processing.


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-26 Thread delding
Github user delding commented on the issue:

https://github.com/apache/flink/pull/2332
  
Hi @ramkrish86 , I'm thinking replace batch() with mutateRow() because it 
provides atomic ordered mutations for a single row, but it only supports Put 
and Delete which should be fine since only Put and Delete are idempotent, this 
way we can implement Put and Delete without using WriteAheadSink (in case of 
deterministic processing). What do you think? 

Regarding Append and Delete, as HBase doesn't support distributed 
transaction across multiple rows, we might need WriteAheadSink and figure out a 
way to roll back table to the last checkpoint state. I'm thinking about this 
right now.

So it might make sense to have two HBaseSinks, one for Put/Delet, the other 
for Append/Delete and non-deterministic processing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525031#comment-15525031
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2332
  
@delding 
Do you have uses cases with Append/Increment? 
I think with the batch() API we are not sure of the order of execution of 
the batch() in the hbase server but still it would help us to achieve the 
atleast-once guarantee as @zentol  mentioned here.



> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-26 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/2332
  
@delding 
Do you have uses cases with Append/Increment? 
I think with the batch() API we are not sure of the order of execution of 
the batch() in the hbase server but still it would help us to achieve the 
atleast-once guarantee as @zentol  mentioned here.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4690) Replace SlotAllocationFuture with flink's own future

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525008#comment-15525008
 ] 

ASF GitHub Bot commented on FLINK-4690:
---

GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/2552

[FLINK-4690] Replace SlotAllocationFuture with flink's own future



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-4690

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2552.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2552


commit f2303f94b5cc18086f5bf34566079cc86a5187e9
Author: Kurt Young 
Date:   2016-09-27T04:10:08Z

[FLINK-4690] Replace SlotAllocationFuture with flink's own future




> Replace SlotAllocationFuture with flink's own future
> 
>
> Key: FLINK-4690
> URL: https://issues.apache.org/jira/browse/FLINK-4690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Kurt Young
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2552: [FLINK-4690] Replace SlotAllocationFuture with fli...

2016-09-26 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/2552

[FLINK-4690] Replace SlotAllocationFuture with flink's own future



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-4690

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2552.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2552


commit f2303f94b5cc18086f5bf34566079cc86a5187e9
Author: Kurt Young 
Date:   2016-09-27T04:10:08Z

[FLINK-4690] Replace SlotAllocationFuture with flink's own future




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4690) Replace SlotAllocationFuture with flink's own future

2016-09-26 Thread Kurt Young (JIRA)
Kurt Young created FLINK-4690:
-

 Summary: Replace SlotAllocationFuture with flink's own future
 Key: FLINK-4690
 URL: https://issues.apache.org/jira/browse/FLINK-4690
 Project: Flink
  Issue Type: Sub-task
Reporter: Kurt Young
Assignee: Kurt Young






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524939#comment-15524939
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2369
  
@rmetzger Thanks for addressing the comments! Did a final pass, and the 
changes look good to me.
I agree with merging the connector as is. Adding the timestamp to the 
regular sink interface seems like a good long term solution.

+1 to merge once travis turns green ;)


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  
> Published messages now include timestamps and compressed messages now include 
> relative offsets.  As it is now, brokers must decompress publisher compressed 
> messages, assign offset to them, and recompress them, which is wasteful and 
> makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2369: [FLINK-4035] Add a streaming connector for Apache Kafka 0...

2016-09-26 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2369
  
@rmetzger Thanks for addressing the comments! Did a final pass, and the 
changes look good to me.
I agree with merging the connector as is. Adding the timestamp to the 
regular sink interface seems like a good long term solution.

+1 to merge once travis turns green ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4606) Integrate the new ResourceManager with the existing FlinkResourceManager

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524850#comment-15524850
 ] 

ASF GitHub Bot commented on FLINK-4606:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2540#discussion_r80612002
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -66,15 +67,16 @@
  * {@link #requestSlot(SlotRequest)} requests a slot from the 
resource manager
  * 
  */
-public class ResourceManager extends RpcEndpoint 
implements LeaderContender {
+public abstract class ResourceManager extends RpcEndpoint implements 
LeaderContender {
--- End diff --

@mxm , I adopt `ResourceManager
extends RpcEndpoint ` at first time, but it 
would fail because of an Exception when I wanna to start a subClass of this 
ResourceManager. For example, `public class StandaloneResourceManager extends 
ResourceManager`, when I start this ResourceManager, 
it would call AkkaRpcService.#startServer, an exception would be thrown here 
because selfGatewayType was mistake for TaskExecutorRegistration class. So I 
change it to `ResourceManager extends RpcEndpoint`


> Integrate the new ResourceManager with the existing FlinkResourceManager
> 
>
> Key: FLINK-4606
> URL: https://issues.apache.org/jira/browse/FLINK-4606
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhangjing
>Assignee: zhangjing
>
> Integrate the new ResourceManager with the existing FlinkResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...

2016-09-26 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2540#discussion_r80612002
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -66,15 +67,16 @@
  * {@link #requestSlot(SlotRequest)} requests a slot from the 
resource manager
  * 
  */
-public class ResourceManager extends RpcEndpoint 
implements LeaderContender {
+public abstract class ResourceManager extends RpcEndpoint implements 
LeaderContender {
--- End diff --

@mxm , I adopt `ResourceManager
extends RpcEndpoint ` at first time, but it 
would fail because of an Exception when I wanna to start a subClass of this 
ResourceManager. For example, `public class StandaloneResourceManager extends 
ResourceManager`, when I start this ResourceManager, 
it would call AkkaRpcService.#startServer, an exception would be thrown here 
because selfGatewayType was mistake for TaskExecutorRegistration class. So I 
change it to `ResourceManager extends RpcEndpoint`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4653) Refactor JobClientActor to adapt to the new Rpc framework and new cluster managerment

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524806#comment-15524806
 ] 

ASF GitHub Bot commented on FLINK-4653:
---

Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2524
  
@mxm , thanks for your review. I modified the content based on your advice:
1. The changes could compile successfully now, sorry for the level mistake.
2. AwaitJobResult method in JobClientUtils would retry upon 
TimeoutException utils JobInfoTracker seems to be dead.


> Refactor JobClientActor to adapt to the new Rpc framework and new cluster 
> managerment
> -
>
> Key: FLINK-4653
> URL: https://issues.apache.org/jira/browse/FLINK-4653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Reporter: zhangjing
>Assignee: zhangjing
> Fix For: 1.2.0
>
>
> 1. Create a RpcEndpoint(temporary named JobInfoTracker) and 
> RpcGateway(temporary named JobInfoTrackerGateway) to replace the old 
> JobClientActor. 
> 2. Change rpc message communication in JobClientActor to rpc method call to 
> apply to the new rpc framework. 
> 3. JobInfoTracker is responsible for waiting for the jobStateChange and 
> jobResult util job complete. But it is not responsible for submitting new job 
> because jobSubmission behavior is different in different cluster



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2524: [FLINK-4653] [Client] Refactor JobClientActor to adapt to...

2016-09-26 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2524
  
@mxm , thanks for your review. I modified the content based on your advice:
1. The changes could compile successfully now, sorry for the level mistake.
2. AwaitJobResult method in JobClientUtils would retry upon 
TimeoutException utils JobInfoTracker seems to be dead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4689) Implement a simple slot provider for the new job manager

2016-09-26 Thread Kurt Young (JIRA)
Kurt Young created FLINK-4689:
-

 Summary: Implement a simple slot provider for the new job manager
 Key: FLINK-4689
 URL: https://issues.apache.org/jira/browse/FLINK-4689
 Project: Flink
  Issue Type: Sub-task
Reporter: Kurt Young
Assignee: Kurt Young


In flip-6 branch, we need to adjust existing scheduling model. In the first 
step, we should introduce a simple / naive slot provider which just ignore all 
the sharing or location constraint, to make whole thing work. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4408) Submit Job and setup ExecutionGraph

2016-09-26 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young resolved FLINK-4408.
---
Resolution: Fixed

> Submit Job and setup ExecutionGraph
> ---
>
> Key: FLINK-4408
> URL: https://issues.apache.org/jira/browse/FLINK-4408
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Xiaogang Shi
>Assignee: Kurt Young
>
> Once granted the leadership, JM will start to execute the job.
> Most code remains the same except that 
> (1) In old implementation where JM manages the execution of multiple jobs, JM 
> has to load all submitted JobGraphs from SubmittedJobGraphStore and recover 
> them. Now that the components creating JM will be responsible for the 
> recovery of JobGraphs, JM will be created with submitted/recovered JobGraph, 
> without the need to load the JobGraph.
> (2) JM should not rely on Akka to listen on the updates of JobStatus and 
> Execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4408) Submit Job and setup ExecutionGraph

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524664#comment-15524664
 ] 

ASF GitHub Bot commented on FLINK-4408:
---

Github user KurtYoung closed the pull request at:

https://github.com/apache/flink/pull/2480


> Submit Job and setup ExecutionGraph
> ---
>
> Key: FLINK-4408
> URL: https://issues.apache.org/jira/browse/FLINK-4408
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Xiaogang Shi
>Assignee: Kurt Young
>
> Once granted the leadership, JM will start to execute the job.
> Most code remains the same except that 
> (1) In old implementation where JM manages the execution of multiple jobs, JM 
> has to load all submitted JobGraphs from SubmittedJobGraphStore and recover 
> them. Now that the components creating JM will be responsible for the 
> recovery of JobGraphs, JM will be created with submitted/recovered JobGraph, 
> without the need to load the JobGraph.
> (2) JM should not rely on Akka to listen on the updates of JobStatus and 
> Execution.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2480: [FLINK-4408][JobManager] Introduce JobMasterRunner...

2016-09-26 Thread KurtYoung
Github user KurtYoung closed the pull request at:

https://github.com/apache/flink/pull/2480


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524101#comment-15524101
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/2509
  
@tzulitai makes sense ! As for for the Map you are right, the 
multiple topic case slipped my mind :)


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

2016-09-26 Thread gyfora
Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/2509
  
@tzulitai makes sense ! As for for the Map you are right, the 
multiple topic case slipped my mind :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-598) Add support for globalOrdering in DataSet API

2016-09-26 Thread Neelesh Srinivas Salian (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neelesh Srinivas Salian closed FLINK-598.
-
Resolution: Fixed

Closing since the sub-tasks are Closed and Resolved.

> Add support for globalOrdering in DataSet API
> -
>
> Key: FLINK-598
> URL: https://issues.apache.org/jira/browse/FLINK-598
> Project: Flink
>  Issue Type: Improvement
>Reporter: GitHub Import
>  Labels: github-import
>
> There is no support for globalOrdering at the moment. In the Record API, it 
> was possible to hand an Ordering and a Distribution to a FileDataSink. In the 
> DataSet API, such a feature is still missing.
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/598
> Created by: [skunert|https://github.com/skunert]
> Labels: enhancement, java api, user satisfaction, 
> Assignee: [fhueske|https://github.com/fhueske]
> Created at: Mon Mar 17 14:08:05 CET 2014
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4439) Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523891#comment-15523891
 ] 

ASF GitHub Bot commented on FLINK-4439:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2397
  
I've rebased to current master and triggered a build. 
https://travis-ci.org/rmetzger/flink/builds/162871029



> Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid
> --
>
> Key: FLINK-4439
> URL: https://issues.apache.org/jira/browse/FLINK-4439
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.3
>Reporter: Gheorghe Gheorghe
>Priority: Minor
>
> The "flink-connector-kafka-0.8_2"  is logging the following error when all 
> 'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08. 
> See stacktrace: 
> {code:title=stacktrace|borderStyle=solid}
> 2016-08-21 15:22:30 WARN  FlinkKafkaConsumerBase:290 - Error communicating 
> with broker inexistentKafkHost:9092 to find partitions for [testTopic].class 
> java.nio.channels.ClosedChannelException. Message: null
> 2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace
> java.nio.channels.ClosedChannelException
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>   at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
>   at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:164)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:131)
>   at MetricsFromKafka$.main(MetricsFromKafka.scala:38)
>   at MetricsFromKafka.main(MetricsFromKafka.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at sbt.Run.invokeMain(Run.scala:67)
>   at sbt.Run.run0(Run.scala:61)
>   at sbt.Run.sbt$Run$$execute$1(Run.scala:51)
>   at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55)
>   at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>   at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>   at sbt.Logger$$anon$4.apply(Logger.scala:84)
>   at sbt.TrapExit$App.run(TrapExit.scala:248)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> In the above stackrace it is hard to figure out that the actual servers 
> provided as a config cannot be resolved to a valid ip address. Moreover the 
> flink kafka consumer will try all of those servers one by one and failing to 
> get partition information.
> The suggested improvement is to fail fast and announce the user that the 
> servers provided in the 'boostrap.servers' config are invalid. If at least 
> one server is valid then the exception should not be thrown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2397: [FLINK-4439] Validate 'bootstrap.servers' config in flink...

2016-09-26 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2397
  
I've rebased to current master and triggered a build. 
https://travis-ci.org/rmetzger/flink/builds/162871029



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523868#comment-15523868
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2332
  
@nielsbasjes I want to avoid to exclude a version that is still in use by 
existing Flink users. I do not have in insight in which HBase versions 
currently in use. If (basically) everybody is on 1.1.x I am definitely in favor 
of bumping the version and API for Flink 1.2.0. Let's move the discussion to 
the dev and user mailing lists.


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-26 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2332
  
@nielsbasjes I want to avoid to exclude a version that is still in use by 
existing Flink users. I do not have in insight in which HBase versions 
currently in use. If (basically) everybody is on 1.1.x I am definitely in favor 
of bumping the version and API for Flink 1.2.0. Let's move the discussion to 
the dev and user mailing lists.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523849#comment-15523849
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2332
  
@fhueske: What is "older" ?
I would like a clear statement about the (minimal) supported versions of 
HBase. 
I would see 1.1.x as old enough, or do you see 0.98 still required?


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-26 Thread nielsbasjes
Github user nielsbasjes commented on the issue:

https://github.com/apache/flink/pull/2332
  
@fhueske: What is "older" ?
I would like a clear statement about the (minimal) supported versions of 
HBase. 
I would see 1.1.x as old enough, or do you see 0.98 still required?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4688) Optimizer hangs for hours when optimizing complex plans

2016-09-26 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-4688:
-
Attachment: stacktrace_76minsAfterSubmission.java

Stacktrace 76 mins after submission

> Optimizer hangs for hours when optimizing complex plans
> ---
>
> Key: FLINK-4688
> URL: https://issues.apache.org/jira/browse/FLINK-4688
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
> Attachments: stacktrace_32minsAfterSubmission.java, 
> stacktrace_76minsAfterSubmission.java, stacktrace_shortlyAfterSubmission.java
>
>
> When optimizing a plan with many operators (more than 250), the optimizer 
> gets stuck for hours.
> A user reported this problem on the user@f.a.o list [1] and provided 
> stacktraces taken at different points in time (shortly after submission, 32 
> minutes and 76 minutes after submission) (see attachments).
> The stacktraces show deeply recursive {{hasDamOnPathDownTo()}} calls. Maybe 
> it is possible to improve the performance by caching the results?
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-batch-workflow-needs-too-much-time-to-create-executionPlan-tp8596.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4688) Optimizer hangs for hours when optimizing complex plans

2016-09-26 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-4688:
-
Attachment: stacktrace_shortlyAfterSubmission.java

Stacktrace shortly after submission

> Optimizer hangs for hours when optimizing complex plans
> ---
>
> Key: FLINK-4688
> URL: https://issues.apache.org/jira/browse/FLINK-4688
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
> Attachments: stacktrace_32minsAfterSubmission.java, 
> stacktrace_shortlyAfterSubmission.java
>
>
> When optimizing a plan with many operators (more than 250), the optimizer 
> gets stuck for hours.
> A user reported this problem on the user@f.a.o list [1] and provided 
> stacktraces taken at different points in time (shortly after submission, 32 
> minutes and 76 minutes after submission) (see attachments).
> The stacktraces show deeply recursive {{hasDamOnPathDownTo()}} calls. Maybe 
> it is possible to improve the performance by caching the results?
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-batch-workflow-needs-too-much-time-to-create-executionPlan-tp8596.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4688) Optimizer hangs for hours when optimizing complex plans

2016-09-26 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-4688:
-
Attachment: stacktrace_32minsAfterSubmission.java

Stacktrace 32 mins after submission


> Optimizer hangs for hours when optimizing complex plans
> ---
>
> Key: FLINK-4688
> URL: https://issues.apache.org/jira/browse/FLINK-4688
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
> Attachments: stacktrace_32minsAfterSubmission.java, 
> stacktrace_shortlyAfterSubmission.java
>
>
> When optimizing a plan with many operators (more than 250), the optimizer 
> gets stuck for hours.
> A user reported this problem on the user@f.a.o list [1] and provided 
> stacktraces taken at different points in time (shortly after submission, 32 
> minutes and 76 minutes after submission) (see attachments).
> The stacktraces show deeply recursive {{hasDamOnPathDownTo()}} calls. Maybe 
> it is possible to improve the performance by caching the results?
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-batch-workflow-needs-too-much-time-to-create-executionPlan-tp8596.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4688) Optimizer hangs for hours when optimizing complex plans

2016-09-26 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-4688:


 Summary: Optimizer hangs for hours when optimizing complex plans
 Key: FLINK-4688
 URL: https://issues.apache.org/jira/browse/FLINK-4688
 Project: Flink
  Issue Type: Bug
  Components: Optimizer
Affects Versions: 1.2.0, 1.1.3
Reporter: Fabian Hueske


When optimizing a plan with many operators (more than 250), the optimizer gets 
stuck for hours.

A user reported this problem on the user@f.a.o list [1] and provided 
stacktraces taken at different points in time (shortly after submission, 32 
minutes and 76 minutes after submission) (see attachments).

The stacktraces show deeply recursive {{hasDamOnPathDownTo()}} calls. Maybe it 
is possible to improve the performance by caching the results?

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-batch-workflow-needs-too-much-time-to-create-executionPlan-tp8596.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523705#comment-15523705
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2509
  
Thank you for working on this. I gave #2369 some love today to speed up 
things ;)


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

2016-09-26 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2509
  
Thank you for working on this. I gave #2369 some love today to speed up 
things ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4439) Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523703#comment-15523703
 ] 

ASF GitHub Bot commented on FLINK-4439:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2397
  
Thank you. The pull request is now good to be merged!


> Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid
> --
>
> Key: FLINK-4439
> URL: https://issues.apache.org/jira/browse/FLINK-4439
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.3
>Reporter: Gheorghe Gheorghe
>Priority: Minor
>
> The "flink-connector-kafka-0.8_2"  is logging the following error when all 
> 'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08. 
> See stacktrace: 
> {code:title=stacktrace|borderStyle=solid}
> 2016-08-21 15:22:30 WARN  FlinkKafkaConsumerBase:290 - Error communicating 
> with broker inexistentKafkHost:9092 to find partitions for [testTopic].class 
> java.nio.channels.ClosedChannelException. Message: null
> 2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace
> java.nio.channels.ClosedChannelException
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>   at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
>   at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:164)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:131)
>   at MetricsFromKafka$.main(MetricsFromKafka.scala:38)
>   at MetricsFromKafka.main(MetricsFromKafka.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at sbt.Run.invokeMain(Run.scala:67)
>   at sbt.Run.run0(Run.scala:61)
>   at sbt.Run.sbt$Run$$execute$1(Run.scala:51)
>   at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55)
>   at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>   at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>   at sbt.Logger$$anon$4.apply(Logger.scala:84)
>   at sbt.TrapExit$App.run(TrapExit.scala:248)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> In the above stackrace it is hard to figure out that the actual servers 
> provided as a config cannot be resolved to a valid ip address. Moreover the 
> flink kafka consumer will try all of those servers one by one and failing to 
> get partition information.
> The suggested improvement is to fail fast and announce the user that the 
> servers provided in the 'boostrap.servers' config are invalid. If at least 
> one server is valid then the exception should not be thrown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2397: [FLINK-4439] Validate 'bootstrap.servers' config in flink...

2016-09-26 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2397
  
Thank you. The pull request is now good to be merged!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523695#comment-15523695
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2369
  
@tzulitai I addressed all your comments except the one relating 
`FlinkKafkaProducer010Configuration`: I had a quick offline discussion with 
@StephanEwen about the issue and he suggested to add the timestamp to the 
regular sink interface.
But I would like to make that change separate from this one, and merge the 
Kafka 0.10. support as-is. This will make it easier for people to try it out 
now and provide us with feedback. Also, I think some other Kafka related pull 
requests are blocked on this one.

@tzulitai could you do a final pass over the changes. If you agree, I'd 
like to merge it afterwards.


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  
> Published messages now include timestamps and compressed messages now include 
> relative offsets.  As it is now, brokers must decompress publisher compressed 
> messages, assign offset to them, and recompress them, which is wasteful and 
> makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2369: [FLINK-4035] Add a streaming connector for Apache Kafka 0...

2016-09-26 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2369
  
@tzulitai I addressed all your comments except the one relating 
`FlinkKafkaProducer010Configuration`: I had a quick offline discussion with 
@StephanEwen about the issue and he suggested to add the timestamp to the 
regular sink interface.
But I would like to make that change separate from this one, and merge the 
Kafka 0.10. support as-is. This will make it easier for people to try it out 
now and provide us with feedback. Also, I think some other Kafka related pull 
requests are blocked on this one.

@tzulitai could you do a final pass over the changes. If you agree, I'd 
like to merge it afterwards.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2529: [FLINK-4241] [table] Cryptic expression parser exc...

2016-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2529


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4241) Cryptic expression parser exceptions

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523636#comment-15523636
 ] 

ASF GitHub Bot commented on FLINK-4241:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2529


> Cryptic expression parser exceptions
> 
>
> Key: FLINK-4241
> URL: https://issues.apache.org/jira/browse/FLINK-4241
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Timo Walther
> Fix For: 1.2.0
>
>
> The exceptions thrown when giving wrong SQL syntax to Flink's SQL parser is 
> very cryptic and should be improved. For example, the following code snippet:
> {code}
> inputTable.filter("a == 0");
> {code}
> gives the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.api.table.ExpressionParserException: Could not parse 
> expression: [1.4] failure: `-' expected but `=' found
> a == 0
>^
>   at 
> org.apache.flink.api.table.expressions.ExpressionParser$.parseExpression(ExpressionParser.scala:355)
>   at org.apache.flink.api.table.Table.filter(table.scala:161)
>   at 
> com.dataartisans.streaming.SimpleTableAPIJob.main(SimpleTableAPIJob.java:32)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}
> From this description it is very hard to understand that {{==}} is not a 
> valid operator.
> Another example is:
> {code}
> inputTable.select("*");
> {code}
> which gives
> {code}
> Exception in thread "main" 
> org.apache.flink.api.table.ExpressionParserException: Could not parse 
> expression: Base Failure
>   at 
> org.apache.flink.api.table.expressions.ExpressionParser$.parseExpressionList(ExpressionParser.scala:342)
>   at org.apache.flink.api.table.Table.select(table.scala:103)
>   at 
> com.dataartisans.streaming.SimpleTableAPIJob.main(SimpleTableAPIJob.java:33)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}
> I think it would considerably improve user experience if we print more 
> helpful parsing exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4241) Cryptic expression parser exceptions

2016-09-26 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4241.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in ef15984988e883ced5311b332e5e5d8521c9573f.

> Cryptic expression parser exceptions
> 
>
> Key: FLINK-4241
> URL: https://issues.apache.org/jira/browse/FLINK-4241
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Timo Walther
> Fix For: 1.2.0
>
>
> The exceptions thrown when giving wrong SQL syntax to Flink's SQL parser is 
> very cryptic and should be improved. For example, the following code snippet:
> {code}
> inputTable.filter("a == 0");
> {code}
> gives the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.api.table.ExpressionParserException: Could not parse 
> expression: [1.4] failure: `-' expected but `=' found
> a == 0
>^
>   at 
> org.apache.flink.api.table.expressions.ExpressionParser$.parseExpression(ExpressionParser.scala:355)
>   at org.apache.flink.api.table.Table.filter(table.scala:161)
>   at 
> com.dataartisans.streaming.SimpleTableAPIJob.main(SimpleTableAPIJob.java:32)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}
> From this description it is very hard to understand that {{==}} is not a 
> valid operator.
> Another example is:
> {code}
> inputTable.select("*");
> {code}
> which gives
> {code}
> Exception in thread "main" 
> org.apache.flink.api.table.ExpressionParserException: Could not parse 
> expression: Base Failure
>   at 
> org.apache.flink.api.table.expressions.ExpressionParser$.parseExpressionList(ExpressionParser.scala:342)
>   at org.apache.flink.api.table.Table.select(table.scala:103)
>   at 
> com.dataartisans.streaming.SimpleTableAPIJob.main(SimpleTableAPIJob.java:33)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> {code}
> I think it would considerably improve user experience if we print more 
> helpful parsing exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4687) Add getAddress method to RpcService

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523629#comment-15523629
 ] 

ASF GitHub Bot commented on FLINK-4687:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2551

[FLINK-4687] [rpc] Add getAddress to RpcService

Adds the `getAddress` method to the `RpcService` which allows to retrieve 
the address under which the `RpcService` is reachable.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink extendRpcGetAddress

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2551.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2551


commit caebd864c9c2b6e2fb1c777e4b84cbc1c02a87d7
Author: Till Rohrmann 
Date:   2016-09-26T16:01:47Z

[FLINK-4687] [rpc] Add getAddress to RpcService




> Add getAddress method to RpcService
> ---
>
> Key: FLINK-4687
> URL: https://issues.apache.org/jira/browse/FLINK-4687
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> It would be useful to expose the hostname to which the {{RpcService}} has 
> been bound. This can then be used to retrieve the network interface which is 
> reachable from the outside.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2551: [FLINK-4687] [rpc] Add getAddress to RpcService

2016-09-26 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2551

[FLINK-4687] [rpc] Add getAddress to RpcService

Adds the `getAddress` method to the `RpcService` which allows to retrieve 
the address under which the `RpcService` is reachable.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink extendRpcGetAddress

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2551.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2551


commit caebd864c9c2b6e2fb1c777e4b84cbc1c02a87d7
Author: Till Rohrmann 
Date:   2016-09-26T16:01:47Z

[FLINK-4687] [rpc] Add getAddress to RpcService




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4687) Add getAddress method to RpcService

2016-09-26 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4687:


 Summary: Add getAddress method to RpcService
 Key: FLINK-4687
 URL: https://issues.apache.org/jira/browse/FLINK-4687
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.2.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann


It would be useful to expose the hostname to which the {{RpcService}} has been 
bound. This can then be used to retrieve the network interface which is 
reachable from the outside.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4252) Table program cannot be compiled

2016-09-26 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4252.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in f150f987772c8d96f41a5acd1d20cba6622cb5c9.

> Table program cannot be compiled
> 
>
> Key: FLINK-4252
> URL: https://issues.apache.org/jira/browse/FLINK-4252
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.0
> Environment: OS X EI Captain
> scala 2.11.7
> jdk 8
>Reporter: Renkai Ge
>Assignee: Timo Walther
> Fix For: 1.2.0
>
> Attachments: TestMain.scala
>
>
> I'm trying the table apis.
> I got some errors like this
> My code is in the attachments
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672)
>   at TestMain$.main(TestMain.scala:31)
>   at TestMain.main(TestMain.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: The user defined 'open(Configuration)' method 
> in class org.apache.flink.api.table.runtime.FlatMapRunner caused an 
> exception: Table program cannot be compiled. This is a bug. Please file an 
> issue.
>   at 
> org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47)
>   at 
> org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471)
>   at 
> 

[GitHub] flink pull request #2507: [FLINK-4252] [table] Validate input and output cla...

2016-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2507


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4252) Table program cannot be compiled

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523617#comment-15523617
 ] 

ASF GitHub Bot commented on FLINK-4252:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2507


> Table program cannot be compiled
> 
>
> Key: FLINK-4252
> URL: https://issues.apache.org/jira/browse/FLINK-4252
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.1.0
> Environment: OS X EI Captain
> scala 2.11.7
> jdk 8
>Reporter: Renkai Ge
>Assignee: Timo Walther
> Attachments: TestMain.scala
>
>
> I'm trying the table apis.
> I got some errors like this
> My code is in the attachments
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
>   at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
>   at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1672)
>   at TestMain$.main(TestMain.scala:31)
>   at TestMain.main(TestMain.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:853)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:799)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: The user defined 'open(Configuration)' method 
> in class org.apache.flink.api.table.runtime.FlatMapRunner caused an 
> exception: Table program cannot be compiled. This is a bug. Please file an 
> issue.
>   at 
> org.apache.flink.runtime.operators.BatchTask.openUserCode(BatchTask.java:1337)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.openTask(ChainedFlatMapDriver.java:47)
>   at 
> org.apache.flink.runtime.operators.BatchTask.openChainedTasks(BatchTask.java:1377)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471)
>   at 
> 

[jira] [Resolved] (FLINK-4590) Some Table API tests are failing when debug lvl is set to DEBUG

2016-09-26 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4590.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in 7eb45c133c49933b14719f06bf68ccf162a3e0b2.

> Some Table API tests are failing when debug lvl is set to DEBUG
> ---
>
> Key: FLINK-4590
> URL: https://issues.apache.org/jira/browse/FLINK-4590
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Timo Walther
> Fix For: 1.2.0
>
>
> For debugging another issue, I've set the log level on travis to DEBUG.
> After that, the Table API tests started failing
> {code}
> Failed tests: 
>   SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
> occurred while applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
> occurred while applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
> occurred while applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
> occurred while applying rule DataSetScanRule
>   SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
> applying rule DataSetScanRule
> {code}
> Probably Calcite is executing additional assertions depending on the debug 
> level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4590) Some Table API tests are failing when debug lvl is set to DEBUG

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523591#comment-15523591
 ] 

ASF GitHub Bot commented on FLINK-4590:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2504


> Some Table API tests are failing when debug lvl is set to DEBUG
> ---
>
> Key: FLINK-4590
> URL: https://issues.apache.org/jira/browse/FLINK-4590
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Timo Walther
>
> For debugging another issue, I've set the log level on travis to DEBUG.
> After that, the Table API tests started failing
> {code}
> Failed tests: 
>   SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
> occurred while applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
> occurred while applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
> occurred while applying rule DataSetScanRule
>   SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error 
> occurred while applying rule DataSetScanRule
>   SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
> applying rule DataSetScanRule
>   SetOperatorsITCase.testMinus:175 Internal error: Error occurred while 
> applying rule DataSetScanRule
> {code}
> Probably Calcite is executing additional assertions depending on the debug 
> level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2504: [FLINK-4590] [table] Some Table API tests are fail...

2016-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2504


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4686) Add possibility to get column names

2016-09-26 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-4686:

Labels: starter  (was: )

> Add possibility to get column names
> ---
>
> Key: FLINK-4686
> URL: https://issues.apache.org/jira/browse/FLINK-4686
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>  Labels: starter
>
> For debugging and maybe for visualization in future (e.g. in a shell) it 
> would be good to have the possibilty to get the names of {{Table}} columns. 
> At the moment the user has no idea how the table columns are named; if they 
> need to be matched with POJO fields for example.
> My suggestion:
> {code}
> Schema s = table.schema();
> TypeInformation type = s.getType(1);
> TypeInformation type = s.getType("col");
> String s = s.getColumnName(1);
> String[] s = s.getColumnNames();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4686) Add possibility to get column names

2016-09-26 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4686:
---

 Summary: Add possibility to get column names
 Key: FLINK-4686
 URL: https://issues.apache.org/jira/browse/FLINK-4686
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Timo Walther


For debugging and maybe for visualization in future (e.g. in a shell) it would 
be good to have the possibilty to get the names of {{Table}} columns. At the 
moment the user has no idea how the table columns are named; if they need to be 
matched with POJO fields for example.

My suggestion:

{code}
Schema s = table.schema();
TypeInformation type = s.getType(1);
TypeInformation type = s.getType("col");
String s = s.getColumnName(1);
String[] s = s.getColumnNames();
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2535: [FLINK-4662] Bump Calcite version up to 1.9

2016-09-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2535


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2765) Upgrade hbase version for hadoop-2 to 1.2 release

2016-09-26 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-2765:
--
Description: 
Currently 0.98.11 is used:
{code}
0.98.11-hadoop2
{code}
Stable release for hadoop-2 is 1.2.x line
We should upgrade to 1.2.1

  was:
Currently 0.98.11 is used:
{code}
0.98.11-hadoop2
{code}
Stable release for hadoop-2 is 1.1.x line
We should upgrade to 1.2.1


> Upgrade hbase version for hadoop-2 to 1.2 release
> -
>
> Key: FLINK-2765
> URL: https://issues.apache.org/jira/browse/FLINK-2765
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Minor
>
> Currently 0.98.11 is used:
> {code}
> 0.98.11-hadoop2
> {code}
> Stable release for hadoop-2 is 1.2.x line
> We should upgrade to 1.2.1



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3734) Unclosed DataInputView in AbstractAlignedProcessingTimeWindowOperator#restoreState()

2016-09-26 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3734:
--
Description: 
{code}
DataInputView in = inputState.getState(getUserCodeClassloader());

final long nextEvaluationTime = in.readLong();
final long nextSlideTime = in.readLong();

AbstractKeyedTimePanes panes = 
createPanes(keySelector, function);
panes.readFromInput(in, keySerializer, stateTypeSerializer);

restoredState = new RestoredState<>(panes, nextEvaluationTime, 
nextSlideTime);
  }
{code}

DataInputView in is not closed upon return.

  was:
{code}
DataInputView in = inputState.getState(getUserCodeClassloader());

final long nextEvaluationTime = in.readLong();
final long nextSlideTime = in.readLong();

AbstractKeyedTimePanes panes = 
createPanes(keySelector, function);
panes.readFromInput(in, keySerializer, stateTypeSerializer);

restoredState = new RestoredState<>(panes, nextEvaluationTime, 
nextSlideTime);
  }
{code}
DataInputView in is not closed upon return.


> Unclosed DataInputView in 
> AbstractAlignedProcessingTimeWindowOperator#restoreState()
> 
>
> Key: FLINK-3734
> URL: https://issues.apache.org/jira/browse/FLINK-3734
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> DataInputView in = inputState.getState(getUserCodeClassloader());
> final long nextEvaluationTime = in.readLong();
> final long nextSlideTime = in.readLong();
> AbstractKeyedTimePanes panes = 
> createPanes(keySelector, function);
> panes.readFromInput(in, keySerializer, stateTypeSerializer);
> restoredState = new RestoredState<>(panes, nextEvaluationTime, 
> nextSlideTime);
>   }
> {code}
> DataInputView in is not closed upon return.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3801) Upgrade Joda-Time library to 2.9.3

2016-09-26 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3801:
--
Description: 
Currently yoda-time 2.5 is used which was very old.


We should upgrade to 2.9.3

  was:
Currently yoda-time 2.5 is used which was very old.

We should upgrade to 2.9.3


> Upgrade Joda-Time library to 2.9.3
> --
>
> Key: FLINK-3801
> URL: https://issues.apache.org/jira/browse/FLINK-3801
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Minor
>
> Currently yoda-time 2.5 is used which was very old.
> We should upgrade to 2.9.3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3222) Incorrect shift amount in OperatorCheckpointStats#hashCode()

2016-09-26 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3222:
--
Description: 
Here is related code:
{code}
result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
>>> 32));
{code}
subTaskStats.length is an int.

The shift amount is greater than 31 bits.

  was:
Here is related code:
{code}
result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
>>> 32));
{code}
subTaskStats.length is an int.


The shift amount is greater than 31 bits.


> Incorrect shift amount in OperatorCheckpointStats#hashCode()
> 
>
> Key: FLINK-3222
> URL: https://issues.apache.org/jira/browse/FLINK-3222
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> Here is related code:
> {code}
> result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
> >>> 32));
> {code}
> subTaskStats.length is an int.
> The shift amount is greater than 31 bits.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4573) Potential resource leak due to unclosed RandomAccessFile in TaskManagerLogHandler

2016-09-26 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4573:
--
Description: 
{code}
try {
raf = new 
RandomAccessFile(file, "r");
} catch (FileNotFoundException 
e) {
display(ctx, request, 
"Displaying TaskManager log failed.");
LOG.error("Displaying 
TaskManager log failed.", e);
return;
}
long fileLength = raf.length();
final FileChannel fc = 
raf.getChannel();
{code}

If length() throws IOException, raf would be left unclosed.

  was:
{code}
try {
raf = new 
RandomAccessFile(file, "r");
} catch (FileNotFoundException 
e) {
display(ctx, request, 
"Displaying TaskManager log failed.");
LOG.error("Displaying 
TaskManager log failed.", e);
return;
}
long fileLength = raf.length();
final FileChannel fc = 
raf.getChannel();
{code}
If length() throws IOException, raf would be left unclosed.


> Potential resource leak due to unclosed RandomAccessFile in 
> TaskManagerLogHandler
> -
>
> Key: FLINK-4573
> URL: https://issues.apache.org/jira/browse/FLINK-4573
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> try {
> raf = new 
> RandomAccessFile(file, "r");
> } catch 
> (FileNotFoundException e) {
> display(ctx, request, 
> "Displaying TaskManager log failed.");
> LOG.error("Displaying 
> TaskManager log failed.", e);
> return;
> }
> long fileLength = 
> raf.length();
> final FileChannel fc = 
> raf.getChannel();
> {code}
> If length() throws IOException, raf would be left unclosed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2016-09-26 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-4534:
--
Description: 
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():
{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():
{code}
for (Map.Entry entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}
w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:
{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}

  was:
Iteration over state.bucketStates is protected by synchronization in other 
methods, except for the following in restoreState():
{code}
for (BucketState bucketState : state.bucketStates.values()) {
{code}
and following in close():
{code}
for (Map.Entry entry : 
state.bucketStates.entrySet()) {
  closeCurrentPartFile(entry.getValue());
{code}
w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue starting 
line 752:

{code}
  Set pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
  LOG.debug("Moving pending files to final location on restore.");
  for (Long pastCheckpointId : pastCheckpointIds) {
{code}


> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4662) Bump Calcite version up to 1.9

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523472#comment-15523472
 ] 

ASF GitHub Bot commented on FLINK-4662:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2535


> Bump Calcite version up to 1.9
> --
>
> Key: FLINK-4662
> URL: https://issues.apache.org/jira/browse/FLINK-4662
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
> Fix For: 1.2.0
>
>
> Calcite just released the 1.9 version. We should adopt it also in the Table 
> API especially for FLINK-4294.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4685) Gather operator checkpoint durations data sizes from the runtime

2016-09-26 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4685:
---

 Summary: Gather operator checkpoint durations data sizes from the 
runtime
 Key: FLINK-4685
 URL: https://issues.apache.org/jira/browse/FLINK-4685
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Affects Versions: 1.1.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.2.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4410) Report more information about operator checkpoints

2016-09-26 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-4410:

Description: 
Checkpoint statistics contain the duration of a checkpoint, measured as from 
the CheckpointCoordinator's start to the point when the acknowledge message 
came.

We should additionally expose
  - duration of the synchronous part of a checkpoint
  - duration of the asynchronous part of a checkpoint
  - number of bytes buffered during the stream alignment phase
  - duration of the stream alignment phase

Note: In the case of using *at-least once* semantics, the latter two will 
always be zero.

  was:Checkpoint statistics contain the duration of a checkpoint. We should 
split this time into the synchronous and asynchronous part. This will give more 
insight into the inner workings of the checkpointing mechanism and help users 
better understand what's going on.


> Report more information about operator checkpoints
> --
>
> Key: FLINK-4410
> URL: https://issues.apache.org/jira/browse/FLINK-4410
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Webfrontend
>Affects Versions: 1.1.2
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
>Priority: Minor
> Fix For: 1.2.0
>
>
> Checkpoint statistics contain the duration of a checkpoint, measured as from 
> the CheckpointCoordinator's start to the point when the acknowledge message 
> came.
> We should additionally expose
>   - duration of the synchronous part of a checkpoint
>   - duration of the asynchronous part of a checkpoint
>   - number of bytes buffered during the stream alignment phase
>   - duration of the stream alignment phase
> Note: In the case of using *at-least once* semantics, the latter two will 
> always be zero.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4410) Report more information about operator checkpoints

2016-09-26 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-4410:

Fix Version/s: 1.2.0

> Report more information about operator checkpoints
> --
>
> Key: FLINK-4410
> URL: https://issues.apache.org/jira/browse/FLINK-4410
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Webfrontend
>Affects Versions: 1.1.2
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> Checkpoint statistics contain the duration of a checkpoint, measured as from 
> the CheckpointCoordinator's start to the point when the acknowledge message 
> came.
> We should additionally expose
>   - duration of the synchronous part of a checkpoint
>   - duration of the asynchronous part of a checkpoint
>   - number of bytes buffered during the stream alignment phase
>   - duration of the stream alignment phase
> Note: In the case of using *at-least once* semantics, the latter two will 
> always be zero.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4410) Report more information about operator checkpoints

2016-09-26 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-4410:

Affects Version/s: 1.1.2

> Report more information about operator checkpoints
> --
>
> Key: FLINK-4410
> URL: https://issues.apache.org/jira/browse/FLINK-4410
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Webfrontend
>Affects Versions: 1.1.2
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> Checkpoint statistics contain the duration of a checkpoint, measured as from 
> the CheckpointCoordinator's start to the point when the acknowledge message 
> came.
> We should additionally expose
>   - duration of the synchronous part of a checkpoint
>   - duration of the asynchronous part of a checkpoint
>   - number of bytes buffered during the stream alignment phase
>   - duration of the stream alignment phase
> Note: In the case of using *at-least once* semantics, the latter two will 
> always be zero.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4410) Report more information about operator checkpoints

2016-09-26 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-4410:

Priority: Major  (was: Minor)

> Report more information about operator checkpoints
> --
>
> Key: FLINK-4410
> URL: https://issues.apache.org/jira/browse/FLINK-4410
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Webfrontend
>Affects Versions: 1.1.2
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> Checkpoint statistics contain the duration of a checkpoint, measured as from 
> the CheckpointCoordinator's start to the point when the acknowledge message 
> came.
> We should additionally expose
>   - duration of the synchronous part of a checkpoint
>   - duration of the asynchronous part of a checkpoint
>   - number of bytes buffered during the stream alignment phase
>   - duration of the stream alignment phase
> Note: In the case of using *at-least once* semantics, the latter two will 
> always be zero.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4410) Report more information about operator checkpoints

2016-09-26 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-4410:

Component/s: State Backends, Checkpointing

> Report more information about operator checkpoints
> --
>
> Key: FLINK-4410
> URL: https://issues.apache.org/jira/browse/FLINK-4410
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Webfrontend
>Affects Versions: 1.1.2
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> Checkpoint statistics contain the duration of a checkpoint, measured as from 
> the CheckpointCoordinator's start to the point when the acknowledge message 
> came.
> We should additionally expose
>   - duration of the synchronous part of a checkpoint
>   - duration of the asynchronous part of a checkpoint
>   - number of bytes buffered during the stream alignment phase
>   - duration of the stream alignment phase
> Note: In the case of using *at-least once* semantics, the latter two will 
> always be zero.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4410) Report more information about operator checkpoints

2016-09-26 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-4410:

Summary: Report more information about operator checkpoints  (was: Split 
checkpoint times into synchronous and asynchronous part)

> Report more information about operator checkpoints
> --
>
> Key: FLINK-4410
> URL: https://issues.apache.org/jira/browse/FLINK-4410
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Ufuk Celebi
>Assignee: Stephan Ewen
>Priority: Minor
>
> Checkpoint statistics contain the duration of a checkpoint. We should split 
> this time into the synchronous and asynchronous part. This will give more 
> insight into the inner workings of the checkpointing mechanism and help users 
> better understand what's going on.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2550: [Flink-4657] Implement HighAvailabilityServices ba...

2016-09-26 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/2550

[Flink-4657] Implement HighAvailabilityServices based on zookeeper

This actually contains 3 commits. More details can be found here: 
https://github.com/StephanEwen/incubator-flink/pull/15

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-4657

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2550.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2550


commit b1e9db28d55ef53c58d17bd0fa73da05a1b6570a
Author: Kurt Young 
Date:   2016-09-26T02:59:16Z

[FLINK-4657] Add contains() to submitted job graph store, to indicate 
whether a job needs to be run.

commit b0c88a04b0461a84a05aaba1d649b59a7e13fe7b
Author: Kurt Young 
Date:   2016-09-22T01:07:13Z

[FLINK-4657] Implement HighAvailabilityServices based on zookeeper

commit b8801720c044cba10562ae8cc5b2770cb21cf07a
Author: Kurt Young 
Date:   2016-09-26T15:36:04Z

[FLINK-4657] Implement a few rpc calls for JobMaster




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4294) Allow access of composite type fields

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523361#comment-15523361
 ] 

ASF GitHub Bot commented on FLINK-4294:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r80500304
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -223,10 +223,28 @@ trait ImplicitExpressionOperations {
 */
   def toTime = Cast(expr, SqlTimeTypeInfo.TIME)
 
-/**
+  /**
 * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a 
SQL Timestamp.
 */
   def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP)
+
+  /**
+* Accesses the field of a Flink composite type (such as Tuple, POJO, 
etc.) by name and
+* returns it's value.
+*
+* @param name name of the field (similar to Flink's field expressions)
+* @return value of the field
+*/
+  def getField(name: String) = GetCompositeField(expr, name)
--- End diff --

I think we don't need a wildcard. I will change this PR to support 
`field$subfield`.


> Allow access of composite type fields
> -
>
> Key: FLINK-4294
> URL: https://issues.apache.org/jira/browse/FLINK-4294
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It 
> would be better to access individual fields of composite types, too. e.g.
> {code}
> SELECT composite.name FROM composites
> SELECT tuple.f0 FROM tuples
> 'f0.getField(0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2319: [FLINK-4294] [table] Allow access of composite typ...

2016-09-26 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2319#discussion_r80500304
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -223,10 +223,28 @@ trait ImplicitExpressionOperations {
 */
   def toTime = Cast(expr, SqlTimeTypeInfo.TIME)
 
-/**
+  /**
 * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a 
SQL Timestamp.
 */
   def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP)
+
+  /**
+* Accesses the field of a Flink composite type (such as Tuple, POJO, 
etc.) by name and
+* returns it's value.
+*
+* @param name name of the field (similar to Flink's field expressions)
+* @return value of the field
+*/
+  def getField(name: String) = GetCompositeField(expr, name)
--- End diff --

I think we don't need a wildcard. I will change this PR to support 
`field$subfield`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523352#comment-15523352
 ] 

ASF GitHub Bot commented on FLINK-4564:
---

Github user ex00 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80499420
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -219,9 +249,16 @@ public ScopeFormats getScopeFormats() {
public void register(Metric metric, String metricName, MetricGroup 
group) {
--- End diff --

In this case I get error in the compile for extending class of 
MetricRegistry. 

For example 
org.apache.flink.runtime.metrics.groups.TaskMetricGroupTest.CountingMetricRegistry#register(Metric
 , String , MetricGroup) and  
org.apache.flink.runtime.metrics.groups.MetricGroupTest.ExceptionOnRegisterRegistry#register(Metric
 , String , MetricGroup)
Cause by:  method does not override or implement a method from a supertype

It is simple tests classes, I can change methods signature in their. But 
will not be any problems in future because of this change in MetricRegistry?


> [metrics] Delimiter should be configured per reporter
> -
>
> Key: FLINK-4564
> URL: https://issues.apache.org/jira/browse/FLINK-4564
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Currently, the delimiter used or the scope string is based on a configuration 
> setting shared by all reporters. However, different reporters may have 
> different requirements in regards to the delimiter, as such we should allow 
> reporters to use a different delimiter.
> We can keep the current setting as a global setting that is used if no 
> specific setting was set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2517: [FLINK-4564] [metrics] Delimiter should be configu...

2016-09-26 Thread ex00
Github user ex00 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80499420
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -219,9 +249,16 @@ public ScopeFormats getScopeFormats() {
public void register(Metric metric, String metricName, MetricGroup 
group) {
--- End diff --

In this case I get error in the compile for extending class of 
MetricRegistry. 

For example 
org.apache.flink.runtime.metrics.groups.TaskMetricGroupTest.CountingMetricRegistry#register(Metric
 , String , MetricGroup) and  
org.apache.flink.runtime.metrics.groups.MetricGroupTest.ExceptionOnRegisterRegistry#register(Metric
 , String , MetricGroup)
Cause by:  method does not override or implement a method from a supertype

It is simple tests classes, I can change methods signature in their. But 
will not be any problems in future because of this change in MetricRegistry?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...

2016-09-26 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r80496915
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 ---
@@ -60,12 +60,17 @@ protected void 
assignPartitionsToConsumer(KafkaConsumer consumer
consumer.assign(topicPartitions);
}
 
+   @Override
+   protected void emitRecord(T record, 
KafkaTopicPartitionState partition, long offset, ConsumerRecord 
consumerRecord) throws Exception {
+   // pass timestamp
+   super.emitRecord(record, partition, offset, 
consumerRecord.timestamp());
+   }
+
/**
 * Emit record Kafka-timestamp aware.
 */
@Override
-   protected  void emitRecord(T record, 
KafkaTopicPartitionState partitionState, long offset, R 
kafkaRecord) throws Exception {
-   long timestamp = ((ConsumerRecord) kafkaRecord).timestamp();
+   protected void emitRecord(T record, 
KafkaTopicPartitionState partitionState, long offset, long 
timestamp) throws Exception {
if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
--- End diff --

Yes, I'll do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

2016-09-26 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2509
  
I'll rebase this PR soon, probably will also wait for Kafka 0.10 connector 
to be merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523340#comment-15523340
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2509
  
I'll rebase this PR soon, probably will also wait for Kafka 0.10 connector 
to be merged.


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2369: [FLINK-4035] Add a streaming connector for Apache Kafka 0...

2016-09-26 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2369
  
I'm currently working on rebasing the PR and addressing the comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523334#comment-15523334
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2369
  
I'm currently working on rebasing the PR and addressing the comments.


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  
> Published messages now include timestamps and compressed messages now include 
> relative offsets.  As it is now, brokers must decompress publisher compressed 
> messages, assign offset to them, and recompress them, which is wasteful and 
> makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4684) Remove obsolete classloader from CheckpointCoordinator

2016-09-26 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4684:
---

 Summary: Remove obsolete classloader from CheckpointCoordinator
 Key: FLINK-4684
 URL: https://issues.apache.org/jira/browse/FLINK-4684
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.2.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Minor
 Fix For: 1.2.0


With the latest checkpointing changes, the {{CheckpointCoordinator}} should not 
execute user code any more, and this not use a User Code ClassLoader any more.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523328#comment-15523328
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2509
  
Hi @gyfora,
Yes, it is absolutely possible to add that. There's actually a JIRA for 
that feature too 
([FLINK-3123](https://issues.apache.org/jira/browse/FLINK-3123)), so I'd say we 
can add that feature on top of the proposed changes here, as a separate follow 
up PR after this one?

One note though, the API for that feature would need to be able to specify 
offsets for partitions of different topics, since the Kafka consumers can 
subscribe multiple topics. So, `Map` wouldn't fit this case, 
probably would be better off having a new user-facing class as the argument to 
define the offsets.


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523327#comment-15523327
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r80496915
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 ---
@@ -60,12 +60,17 @@ protected void 
assignPartitionsToConsumer(KafkaConsumer consumer
consumer.assign(topicPartitions);
}
 
+   @Override
+   protected void emitRecord(T record, 
KafkaTopicPartitionState partition, long offset, ConsumerRecord 
consumerRecord) throws Exception {
+   // pass timestamp
+   super.emitRecord(record, partition, offset, 
consumerRecord.timestamp());
+   }
+
/**
 * Emit record Kafka-timestamp aware.
 */
@Override
-   protected  void emitRecord(T record, 
KafkaTopicPartitionState partitionState, long offset, R 
kafkaRecord) throws Exception {
-   long timestamp = ((ConsumerRecord) kafkaRecord).timestamp();
+   protected void emitRecord(T record, 
KafkaTopicPartitionState partitionState, long offset, long 
timestamp) throws Exception {
if (timestampWatermarkMode == NO_TIMESTAMPS_WATERMARKS) {
--- End diff --

Yes, I'll do that.


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  
> Published messages now include timestamps and compressed messages now include 
> relative offsets.  As it is now, brokers must decompress publisher compressed 
> messages, assign offset to them, and recompress them, which is wasteful and 
> makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

2016-09-26 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2509
  
Hi @gyfora,
Yes, it is absolutely possible to add that. There's actually a JIRA for 
that feature too 
([FLINK-3123](https://issues.apache.org/jira/browse/FLINK-3123)), so I'd say we 
can add that feature on top of the proposed changes here, as a separate follow 
up PR after this one?

One note though, the API for that feature would need to be able to specify 
offsets for partitions of different topics, since the Kafka consumers can 
subscribe multiple topics. So, `Map` wouldn't fit this case, 
probably would be better off having a new user-facing class as the argument to 
define the offsets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2369: [FLINK-4035] Add a streaming connector for Apache ...

2016-09-26 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r80491475
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 ---
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.10.x
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction 
interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
+ *
+ *  Pre Kafka 0.10 producers only follow approach (a), allowing users to 
use the producer using the
+ *  DataStream.addSink() method.
+ *  Since the APIs exposed in that variant do not allow accessing the the 
timestamp attached to the record
+ *  the Kafka 0.10 producer has a section invocation option, approach (b).
+ *
+ * Details about approach (b):
+ *  Kafka 0.10 supports writing the timestamp attached to a record to 
Kafka. When adding the
+ *  FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafka() 
method, the Kafka producer
+ *  can access the internal record timestamp of the record and write it to 
Kafka.
+ *
+ * All methods and constructors in this class are marked with the approach 
they are needed for.
+ */
+public class FlinkKafkaProducer010 extends StreamSink implements 
SinkFunction, RichFunction {
+
+   /**
+* Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
+*/
+   private boolean writeTimestampToKafka = false;
+
+   // -- "Constructors" for timestamp writing 
--
+
+   /**
+* Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+* the topic.
+*
+* This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+*
+* @param inStream The stream to write to Kafka
+* @param topicId ID of the Kafka topic.
+* @param serializationSchema User defined serialization schema 
supporting key/value messages
+* @param producerConfig Properties with the producer configuration.
+*/
+   public static  FlinkKafkaProducer010Configuration 
writeToKafka(DataStream inStream,
+  

[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523269#comment-15523269
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2369#discussion_r80491475
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 ---
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.10.x
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction 
interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
+ *
+ *  Pre Kafka 0.10 producers only follow approach (a), allowing users to 
use the producer using the
+ *  DataStream.addSink() method.
+ *  Since the APIs exposed in that variant do not allow accessing the the 
timestamp attached to the record
+ *  the Kafka 0.10 producer has a section invocation option, approach (b).
+ *
+ * Details about approach (b):
+ *  Kafka 0.10 supports writing the timestamp attached to a record to 
Kafka. When adding the
+ *  FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafka() 
method, the Kafka producer
+ *  can access the internal record timestamp of the record and write it to 
Kafka.
+ *
+ * All methods and constructors in this class are marked with the approach 
they are needed for.
+ */
+public class FlinkKafkaProducer010 extends StreamSink implements 
SinkFunction, RichFunction {
+
+   /**
+* Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
+*/
+   private boolean writeTimestampToKafka = false;
+
+   // -- "Constructors" for timestamp writing 
--
+
+   /**
+* Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+* the topic.
+*
+* This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+*
+* @param inStream The stream to write to Kafka
+* @param topicId ID of the Kafka topic.
+* @param serializationSchema User defined serialization schema 
supporting 

[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523203#comment-15523203
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/2509
  
Hi,

I like the proposed changes, do you think it would make sense to add the 
possibility to set specific offsets on a per partition basis?

```
kafka.setStartOffsets(Map partitionOffsets)
```

I think this is extremely useful in production use.


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2509: [FLINK-4280][kafka-connector] Explicit start position con...

2016-09-26 Thread gyfora
Github user gyfora commented on the issue:

https://github.com/apache/flink/pull/2509
  
Hi,

I like the proposed changes, do you think it would make sense to add the 
possibility to set specific offsets on a per partition basis?

```
kafka.setStartOffsets(Map partitionOffsets)
```

I think this is extremely useful in production use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523161#comment-15523161
 ] 

ASF GitHub Bot commented on FLINK-4564:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80475991
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 ---
@@ -280,4 +282,65 @@ public void testConfigurableDelimiter() {
 
registry.shutdown();
}
+
+   @Test
+   public void testConfigurableDelimiterForReporters() {
+   Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2,test3");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+
+   MetricRegistry registry = new MetricRegistry(config);
+
+   assertEquals('.', registry.getDelimiter());
+   assertEquals('_', registry.getDelimiter(0));
+   assertEquals('-', registry.getDelimiter(1));
+   assertEquals('.', registry.getDelimiter(2));
+   assertEquals('.', registry.getDelimiter(3));
+   assertEquals('.', registry.getDelimiter(-1));
+
+   registry.shutdown();
+   }
+
+   @Test
+   public void testConfigurableDelimiterForReportersInGroup() {
+   Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2,test3,test4");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+   config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B");
+
+   MetricRegistry registry = new MetricRegistry(config);
+   List reporters = registry.getReporters();
+   ((TestReporter8)reporters.get(0)).expectedDelimiter = '_'; 
//test1  reporter
+   ((TestReporter8)reporters.get(1)).expectedDelimiter = '-'; 
//test2 reporter
+   ((TestReporter8)reporters.get(2)).expectedDelimiter = '.'; 
//test3 reporter, because 'AA' - not correct delimiter
+   //for test4 reporter use global delimiter
+
--- End diff --

are you sure that this test accurately detects errors? if the assert within 
notify() fails an exception is thrown, which however is never propagated but 
only logged.

It would however be correct if you execute countCall++ after 
assertEquals(), as you only reach it for a correct delimiter. Then of course it 
should be renamed to something like "numCorrectDelimiters" or something.


> [metrics] Delimiter should be configured per reporter
> -
>
> Key: FLINK-4564
> URL: https://issues.apache.org/jira/browse/FLINK-4564
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: 

[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523162#comment-15523162
 ] 

ASF GitHub Bot commented on FLINK-4564:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80474552
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -219,9 +249,16 @@ public ScopeFormats getScopeFormats() {
public void register(Metric metric, String metricName, MetricGroup 
group) {
--- End diff --

if you change the signature to (Metric, String, AbstractMetricGroup) you 
would no longer need the cast on L257.


> [metrics] Delimiter should be configured per reporter
> -
>
> Key: FLINK-4564
> URL: https://issues.apache.org/jira/browse/FLINK-4564
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Currently, the delimiter used or the scope string is based on a configuration 
> setting shared by all reporters. However, different reporters may have 
> different requirements in regards to the delimiter, as such we should allow 
> reporters to use a different delimiter.
> We can keep the current setting as a global setting that is used if no 
> specific setting was set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523156#comment-15523156
 ] 

ASF GitHub Bot commented on FLINK-4564:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80480187
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * When need know position number for reporter in  metric group
+ *
+ * @param  reference to {@link AbstractMetricGroup AbstractMetricGroup}
+ */
+public class FrontMetricGroup> implements 
MetricGroup {
+
+   protected A reference;
+   protected int index;
--- End diff --

should be called reporterIndex.


> [metrics] Delimiter should be configured per reporter
> -
>
> Key: FLINK-4564
> URL: https://issues.apache.org/jira/browse/FLINK-4564
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Currently, the delimiter used or the scope string is based on a configuration 
> setting shared by all reporters. However, different reporters may have 
> different requirements in regards to the delimiter, as such we should allow 
> reporters to use a different delimiter.
> We can keep the current setting as a global setting that is used if no 
> specific setting was set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523159#comment-15523159
 ] 

ASF GitHub Bot commented on FLINK-4564:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80477644
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 ---
@@ -280,4 +282,65 @@ public void testConfigurableDelimiter() {
 
registry.shutdown();
}
+
+   @Test
+   public void testConfigurableDelimiterForReporters() {
+   Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2,test3");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+
+   MetricRegistry registry = new MetricRegistry(config);
+
+   assertEquals('.', registry.getDelimiter());
+   assertEquals('_', registry.getDelimiter(0));
+   assertEquals('-', registry.getDelimiter(1));
+   assertEquals('.', registry.getDelimiter(2));
+   assertEquals('.', registry.getDelimiter(3));
+   assertEquals('.', registry.getDelimiter(-1));
+
+   registry.shutdown();
+   }
+
+   @Test
+   public void testConfigurableDelimiterForReportersInGroup() {
+   Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2,test3,test4");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+   config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B");
+
+   MetricRegistry registry = new MetricRegistry(config);
+   List reporters = registry.getReporters();
+   ((TestReporter8)reporters.get(0)).expectedDelimiter = '_'; 
//test1  reporter
--- End diff --

for clarity purposes we should explicitly set the expectedDelimiter here.


> [metrics] Delimiter should be configured per reporter
> -
>
> Key: FLINK-4564
> URL: https://issues.apache.org/jira/browse/FLINK-4564
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Currently, the delimiter used or the scope string is based on a configuration 
> setting shared by all reporters. However, different reporters may have 
> different requirements in regards to the delimiter, as such we should allow 
> reporters to use a different delimiter.
> We can keep the current setting as a global setting that is used if no 
> specific setting was set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2517: [FLINK-4564] [metrics] Delimiter should be configu...

2016-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80474584
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -219,9 +249,16 @@ public ScopeFormats getScopeFormats() {
public void register(Metric metric, String metricName, MetricGroup 
group) {
try {
if (reporters != null) {
-   for (MetricReporter reporter : reporters) {
+   for (int i = 0; i < reporters.size(); i++) {
+   MetricReporter reporter = 
reporters.get(i);
if (reporter != null) {
-   
reporter.notifyOfAddedMetric(metric, metricName, group);
+   if (group instanceof 
AbstractMetricGroup) {
--- End diff --

this will always be true.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2517: [FLINK-4564] [metrics] Delimiter should be configu...

2016-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80474552
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -219,9 +249,16 @@ public ScopeFormats getScopeFormats() {
public void register(Metric metric, String metricName, MetricGroup 
group) {
--- End diff --

if you change the signature to (Metric, String, AbstractMetricGroup) you 
would no longer need the cast on L257.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523158#comment-15523158
 ] 

ASF GitHub Bot commented on FLINK-4564:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80481547
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * When need know position number for reporter in  metric group
--- End diff --

"Metric group which forwards all registration calls to a varaible parent 
metric group that injects a variable reporter index into calls to {@link 
org.apache.flink.metrics.MetricGroup#getMetricIdentifier(String)} or {@link 
org.apache.flink.metrics.MetricGroup#getMetricIdentifier(String, 
CharacterFilter)}. This allows us to use reporter-specific delimiters, without 
requiring any action by the reporter."


> [metrics] Delimiter should be configured per reporter
> -
>
> Key: FLINK-4564
> URL: https://issues.apache.org/jira/browse/FLINK-4564
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Currently, the delimiter used or the scope string is based on a configuration 
> setting shared by all reporters. However, different reporters may have 
> different requirements in regards to the delimiter, as such we should allow 
> reporters to use a different delimiter.
> We can keep the current setting as a global setting that is used if no 
> specific setting was set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523155#comment-15523155
 ] 

ASF GitHub Bot commented on FLINK-4564:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80477622
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 ---
@@ -280,4 +282,65 @@ public void testConfigurableDelimiter() {
 
registry.shutdown();
}
+
+   @Test
+   public void testConfigurableDelimiterForReporters() {
+   Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2,test3");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+
+   MetricRegistry registry = new MetricRegistry(config);
+
+   assertEquals('.', registry.getDelimiter());
--- End diff --

introducing a static final char field for the global delimiter would 
improve readability.


> [metrics] Delimiter should be configured per reporter
> -
>
> Key: FLINK-4564
> URL: https://issues.apache.org/jira/browse/FLINK-4564
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Currently, the delimiter used or the scope string is based on a configuration 
> setting shared by all reporters. However, different reporters may have 
> different requirements in regards to the delimiter, as such we should allow 
> reporters to use a different delimiter.
> We can keep the current setting as a global setting that is used if no 
> specific setting was set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2517: [FLINK-4564] [metrics] Delimiter should be configu...

2016-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80473933
  
--- Diff: docs/monitoring/metrics.md ---
@@ -280,6 +280,7 @@ Metrics can be exposed to an external system by 
configuring one or several repor
 - `metrics.reporter..`: Generic setting `` for the 
reporter named ``.
 - `metrics.reporter..class`: The reporter class to use for the 
reporter named ``.
 - `metrics.reporter..interval`: The reporter interval to use for the 
reporter named ``.
+- `metrics.reporter..scope.delimiter`: The delimiter for to use for 
the identifier (default value use `metrics.scope.delimiter`) for the reporter 
named ``.
--- End diff --

for to use -> to use


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2517: [FLINK-4564] [metrics] Delimiter should be configu...

2016-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80480187
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * When need know position number for reporter in  metric group
+ *
+ * @param  reference to {@link AbstractMetricGroup AbstractMetricGroup}
+ */
+public class FrontMetricGroup> implements 
MetricGroup {
+
+   protected A reference;
+   protected int index;
--- End diff --

should be called reporterIndex.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2517: [FLINK-4564] [metrics] Delimiter should be configu...

2016-09-26 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80477622
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 ---
@@ -280,4 +282,65 @@ public void testConfigurableDelimiter() {
 
registry.shutdown();
}
+
+   @Test
+   public void testConfigurableDelimiterForReporters() {
+   Configuration config = new Configuration();
+   config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2,test3");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+
+   MetricRegistry registry = new MetricRegistry(config);
+
+   assertEquals('.', registry.getDelimiter());
--- End diff --

introducing a static final char field for the global delimiter would 
improve readability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523157#comment-15523157
 ] 

ASF GitHub Bot commented on FLINK-4564:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80474584
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -219,9 +249,16 @@ public ScopeFormats getScopeFormats() {
public void register(Metric metric, String metricName, MetricGroup 
group) {
try {
if (reporters != null) {
-   for (MetricReporter reporter : reporters) {
+   for (int i = 0; i < reporters.size(); i++) {
+   MetricReporter reporter = 
reporters.get(i);
if (reporter != null) {
-   
reporter.notifyOfAddedMetric(metric, metricName, group);
+   if (group instanceof 
AbstractMetricGroup) {
--- End diff --

this will always be true.


> [metrics] Delimiter should be configured per reporter
> -
>
> Key: FLINK-4564
> URL: https://issues.apache.org/jira/browse/FLINK-4564
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Currently, the delimiter used or the scope string is based on a configuration 
> setting shared by all reporters. However, different reporters may have 
> different requirements in regards to the delimiter, as such we should allow 
> reporters to use a different delimiter.
> We can keep the current setting as a global setting that is used if no 
> specific setting was set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4564) [metrics] Delimiter should be configured per reporter

2016-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15523163#comment-15523163
 ] 

ASF GitHub Bot commented on FLINK-4564:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2517#discussion_r80473933
  
--- Diff: docs/monitoring/metrics.md ---
@@ -280,6 +280,7 @@ Metrics can be exposed to an external system by 
configuring one or several repor
 - `metrics.reporter..`: Generic setting `` for the 
reporter named ``.
 - `metrics.reporter..class`: The reporter class to use for the 
reporter named ``.
 - `metrics.reporter..interval`: The reporter interval to use for the 
reporter named ``.
+- `metrics.reporter..scope.delimiter`: The delimiter for to use for 
the identifier (default value use `metrics.scope.delimiter`) for the reporter 
named ``.
--- End diff --

for to use -> to use


> [metrics] Delimiter should be configured per reporter
> -
>
> Key: FLINK-4564
> URL: https://issues.apache.org/jira/browse/FLINK-4564
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Anton Mushin
>
> Currently, the delimiter used or the scope string is based on a configuration 
> setting shared by all reporters. However, different reporters may have 
> different requirements in regards to the delimiter, as such we should allow 
> reporters to use a different delimiter.
> We can keep the current setting as a global setting that is used if no 
> specific setting was set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >