[jira] [Updated] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-01-30 Thread Yuta Morisawa (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuta Morisawa updated FLINK-8532:
-
Description: 
In some conditions, RebalancePartitioner doesn't balance data correctly because 
it use the same value for selecting next operators.

RebalancePartitioner initializes its partition id using the same value in every 
threads, so it indeed balances data, but at one moment the amount of data in 
each operator is skew.

Particularly, when the data rate of  former operators is equal , data skew 
becomes severe.

 

 

Example:

Consider a simple operator chain.

-> map1 -> rebalance -> map2 ->

Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 6).

map1          map2

 st1              st4

 st2              st5

 st3              st6

 

At the beginning, every subtasks in map1 sends data to st4 in map2 because they 
use the same initial parition id.

Next time the map1 receive data st1,2,3 send data to st5 because they increment 
its partition id when they processed former data.

In my environment,  it takes twice the time to process data when I use 
RebalancePartitioner  as long as I use other partitioners(rescale, keyby).

 

To solve this problem, in my opinion, RebalancePartitioner should use its own 
operator id for the initial value.

 

  was:
In some conditions, RebalancePartitioner doesn't balance data correctly because 
it use the same value for selecting next operators.

RebalancePartitioner initializes its partition id using the same value in every 
threads, so it indeed balances data, but at one moment the amount of data in 
each operator is skew.

Particularly, when the data rate of  former operators is equal , data skew 
becomes severe.

 

 

Example:

Consider a simple operator chain.

--> map1 ---rebalance--> map2 —>

Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 6).

map1          map2

 st1              st4

 st2              st5

 st3              st6

 

At the beginning, every subtasks in map1 sends data to st4 in map2 because they 
use the same initial parition id.

Next time the map1 receive data st1,2,3 send data to st5 because they increment 
its partition id when they processed former data.

In my environment,  it takes twice the time to process data when I use 
RebalancePartitioner  as long as I use other partitioners(rescale, keyby).

 

To solve this problem, in my opinion, RebalancePartitioner should use its own 
operator id for the initial value.

 


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Priority: Minor
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> -> map1 -> rebalance -> map2 ->
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8357) enable rolling in default log settings

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346376#comment-16346376
 ] 

ASF GitHub Bot commented on FLINK-8357:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5371
  
I watched and the Travis error does not relevant to this issue.


> enable rolling in default log settings
> --
>
> Key: FLINK-8357
> URL: https://issues.apache.org/jira/browse/FLINK-8357
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Reporter: Xu Mingmin
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The release packages uses {{org.apache.log4j.FileAppender}} for log4j and 
> {{ch.qos.logback.core.FileAppender}} for logback, which could results in very 
> large log files. 
> For most cases, if not all, we need to enable rotation in a production 
> cluster, and I suppose it's a good idea to make rotation as default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-01-30 Thread Yuta Morisawa (JIRA)
Yuta Morisawa created FLINK-8532:


 Summary: RebalancePartitioner should use Random value for its 
first partition
 Key: FLINK-8532
 URL: https://issues.apache.org/jira/browse/FLINK-8532
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Yuta Morisawa


In some conditions, RebalancePartitioner doesn't balance data correctly because 
it use the same value for selecting next operators.

RebalancePartitioner initializes its partition id using the same value in every 
threads, so it indeed balances data, but at one moment the amount of data in 
each operator is skew.

Particularly, when the data rate of  former operators is equal , data skew 
becomes severe.

 

 

Example:

Consider a simple operator chain.

---> map1 ---rebalance---> map2 —>

Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 6).

map1          map2

 st1              st4

 st2              st5

 st3              st6

 

At the beginning, every subtasks in map1 sends data to st4 in map2 because they 
use the same initial parition id.

Next time the map1 receive data st1,2,3 send data to st5 because they increment 
its partition id when they processed former data.

In my environment,  it takes twice the time to process data when I use 
RebalancePartitioner  as long as I use other partitioners(rescale, keyby).

 

To solve this problem, in my opinion, RebalancePartitioner should use its own 
operator id for the initial value.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-01-30 Thread Yuta Morisawa (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuta Morisawa updated FLINK-8532:
-
Description: 
In some conditions, RebalancePartitioner doesn't balance data correctly because 
it use the same value for selecting next operators.

RebalancePartitioner initializes its partition id using the same value in every 
threads, so it indeed balances data, but at one moment the amount of data in 
each operator is skew.

Particularly, when the data rate of  former operators is equal , data skew 
becomes severe.

 

 

Example:

Consider a simple operator chain.

--> map1 ---rebalance--> map2 —>

Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 6).

map1          map2

 st1              st4

 st2              st5

 st3              st6

 

At the beginning, every subtasks in map1 sends data to st4 in map2 because they 
use the same initial parition id.

Next time the map1 receive data st1,2,3 send data to st5 because they increment 
its partition id when they processed former data.

In my environment,  it takes twice the time to process data when I use 
RebalancePartitioner  as long as I use other partitioners(rescale, keyby).

 

To solve this problem, in my opinion, RebalancePartitioner should use its own 
operator id for the initial value.

 

  was:
In some conditions, RebalancePartitioner doesn't balance data correctly because 
it use the same value for selecting next operators.

RebalancePartitioner initializes its partition id using the same value in every 
threads, so it indeed balances data, but at one moment the amount of data in 
each operator is skew.

Particularly, when the data rate of  former operators is equal , data skew 
becomes severe.

 

 

Example:

Consider a simple operator chain.

---> map1 ---rebalance---> map2 —>

Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 6).

map1          map2

 st1              st4

 st2              st5

 st3              st6

 

At the beginning, every subtasks in map1 sends data to st4 in map2 because they 
use the same initial parition id.

Next time the map1 receive data st1,2,3 send data to st5 because they increment 
its partition id when they processed former data.

In my environment,  it takes twice the time to process data when I use 
RebalancePartitioner  as long as I use other partitioners(rescale, keyby).

 

To solve this problem, in my opinion, RebalancePartitioner should use its own 
operator id for the initial value.

 


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Priority: Minor
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> --> map1 ---rebalance--> map2 —>
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5371: [FLINK-8357] [conf] Enable rolling in default log setting...

2018-01-30 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5371
  
I watched and the Travis error does not relevant to this issue.


---


[jira] [Commented] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346366#comment-16346366
 ] 

ASF GitHub Bot commented on FLINK-7095:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5375
  
@tillrohrmann It seems that now there is not any ```Option``` be registered 
for later parse the ```configDir ``` parameter I just checked. I either 
register the ```configDir``` parameter in somewhere, such as in 
```CliFrontendParser``` class or directly use 
```TaskManager.parseArgsAndLoadConfig(args);``` to get ```Configuration```. 
What do you think of this ? @tillrohrmann Thanks.


> Add proper command line parsing tool to TaskManagerRunner.main
> --
>
> Key: FLINK-7095
> URL: https://issues.apache.org/jira/browse/FLINK-7095
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> We need to add a proper command line parsing tool to the entry point of the 
> {{TaskManagerRunner#main}}. At the moment, we are simply using the 
> {{ParameterTool}} as a temporary solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5375: [FLINK-7095] [TaskManager] Add Command line parsing tool ...

2018-01-30 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5375
  
@tillrohrmann It seems that now there is not any ```Option``` be registered 
for later parse the ```configDir ``` parameter I just checked. I either 
register the ```configDir``` parameter in somewhere, such as in 
```CliFrontendParser``` class or directly use 
```TaskManager.parseArgsAndLoadConfig(args);``` to get ```Configuration```. 
What do you think of this ? @tillrohrmann Thanks.


---


[jira] [Commented] (FLINK-8407) Setting the parallelism after a partitioning operation should be forbidden

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346348#comment-16346348
 ] 

ASF GitHub Bot commented on FLINK-8407:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5369
  
Ah ha, it doesn't matter.  The test has been updated. Actually, I wanted 
to ensure that all the partitioning methods should cause the exception. 
However, that would be fussy and thus I only kept the broadcasted one.


> Setting the parallelism after a partitioning operation should be forbidden
> --
>
> Key: FLINK-8407
> URL: https://issues.apache.org/jira/browse/FLINK-8407
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Major
>
> Partitioning operations ({{shuffle}}, {{rescale}}, etc.) for a {{DataStream}} 
> create new {{DataStreams}}, which allow the users to set parallelisms for 
> them. However, the {{PartitionTransformations}} in these returned 
> {{DataStreams}} will only add virtual nodes, whose parallelisms could not be 
> specified, in the execution graph. We should forbid users to set the 
> parallelism after a partitioning operation since they won't actually work. 
> Also the corresponding documents should be updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5369: [FLINK-8407][DataStream]Setting the parallelism after a p...

2018-01-30 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5369
  
Ah ha, it doesn't matter. 😄 The test has been updated. Actually, I 
wanted to ensure that all the partitioning methods should cause the exception. 
However, that would be fussy and thus I only kept the broadcasted one.


---


[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346320#comment-16346320
 ] 

ASF GitHub Bot commented on FLINK-8101:
---

Github user yew1eb commented on a diff in the pull request:

https://github.com/apache/flink/pull/5374#discussion_r164961589
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -32,7 +32,6 @@
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
--- End diff --

Should not be deleted, be used for {@link Client}.


> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Flavio Pompermaier
>Priority: Major
> Fix For: 1.5.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

2018-01-30 Thread yew1eb
Github user yew1eb commented on a diff in the pull request:

https://github.com/apache/flink/pull/5374#discussion_r164961589
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -32,7 +32,6 @@
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.client.Client;
--- End diff --

Should not be deleted, be used for {@link Client}.


---


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346257#comment-16346257
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r164952695
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -93,6 +93,12 @@
/** User supplied deserialization schema to convert Kinesis byte 
messages to Flink objects. */
private final KinesisDeserializationSchema deserializer;
 
+   /**
+* The function that determines which subtask a shard should be 
assigned to.
+*/
+   // TODO: instead of the property, use a factory method that would allow 
subclass to access source context?
--- End diff --

createFn(...) that will allow the function to be created with access to 
runtime context (like the number of subtasks), and then change the fn signature 
to only take shard metadata as parameter. Subclasses can override createFn, 
instead of having the property.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-30 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r164952695
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -93,6 +93,12 @@
/** User supplied deserialization schema to convert Kinesis byte 
messages to Flink objects. */
private final KinesisDeserializationSchema deserializer;
 
+   /**
+* The function that determines which subtask a shard should be 
assigned to.
+*/
+   // TODO: instead of the property, use a factory method that would allow 
subclass to access source context?
--- End diff --

createFn(...) that will allow the function to be created with access to 
runtime context (like the number of subtasks), and then change the fn signature 
to only take shard metadata as parameter. Subclasses can override createFn, 
instead of having the property.


---


[jira] [Commented] (FLINK-8357) enable rolling in default log settings

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346201#comment-16346201
 ] 

ASF GitHub Bot commented on FLINK-8357:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5371
  
I have changed the code, @StephanEwen @XuMingmin Welcome to review those 
codes ~ Thanks. I make the ```MaxFileSize``` to 200MB and storage for 30 days 
now.


> enable rolling in default log settings
> --
>
> Key: FLINK-8357
> URL: https://issues.apache.org/jira/browse/FLINK-8357
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Reporter: Xu Mingmin
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The release packages uses {{org.apache.log4j.FileAppender}} for log4j and 
> {{ch.qos.logback.core.FileAppender}} for logback, which could results in very 
> large log files. 
> For most cases, if not all, we need to enable rotation in a production 
> cluster, and I suppose it's a good idea to make rotation as default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5371: [FLINK-8357] [conf] Enable rolling in default log setting...

2018-01-30 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5371
  
I have changed the code, @StephanEwen @XuMingmin Welcome to review those 
codes ~ Thanks. I make the ```MaxFileSize``` to 200MB and storage for 30 days 
now.


---


[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-30 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346180#comment-16346180
 ] 

yanxiaobin edited comment on FLINK-8500 at 1/31/18 3:09 AM:


hi,[~aljoscha] , thank you for your reply!Please look at the next picture!

!image-2018-01-31-10-48-59-633.png!

The final eventtime is obtained from  “{color:#80}final long 
{color}newTimestamp = extractAscendingTimestamp(element);“  , and   the element 
was deserialized from "KeyedDeserializationSchema" . Also the  parameter 
"elementPrevTimestamp" that is Kafka timestamp is not used!  So I think that 
the method deserialize of KeyedDeserializationSchema  should add a parameter 
'kafka message timestamp' (from ConsumerRecord) .And in some business 
scenarios, this is useful! Thanks! 


was (Author: backlight):
hi,[~aljoscha] , thank you for your reply!Please look at the next picture!

!image-2018-01-31-10-48-59-633.png!

The final eventtime is obtained from  “{color:#80}final long 
{color}newTimestamp = extractAscendingTimestamp(element);“  , and   the element 
was deserialized from "KeyedDeserializationSchema" . Also the  parameter 
"elementPrevTimestamp" that is Kafka timestamp is not used!  Thanks!

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-30 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346180#comment-16346180
 ] 

yanxiaobin edited comment on FLINK-8500 at 1/31/18 3:01 AM:


hi,[~aljoscha] , thank you for your reply!Please look at the next picture!

!image-2018-01-31-10-48-59-633.png!

The final eventtime is obtained from  “{color:#80}final long 
{color}newTimestamp = extractAscendingTimestamp(element);“  , and   the element 
was deserialized from "KeyedDeserializationSchema" . Also the  parameter 
"elementPrevTimestamp" that is Kafka timestamp is not used!  Thanks!


was (Author: backlight):
hi,[~aljoscha] , thank you for your reply!Please look at the next picture!

!image-2018-01-31-10-48-59-633.png!

The final eventtime is obtained from  “{color:#80}final long 
{color}newTimestamp = extractAscendingTimestamp(element);“  , and   the element 
was deserialized from "KeyedDeserializationSchema" . Also the  parameter 
"elementPrevTimestamp" is not used!  Thanks!

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-30 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346180#comment-16346180
 ] 

yanxiaobin commented on FLINK-8500:
---

hi,[~aljoscha] , thank you for your reply!Please look at the next picture!

!image-2018-01-31-10-48-59-633.png!

The final eventtime is obtained from  “{color:#80}final long 
{color}newTimestamp = extractAscendingTimestamp(element);“  , and   the element 
was deserialized from "KeyedDeserializationSchema" . Also the  parameter 
"elementPrevTimestamp" is not used!  Thanks!

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the number of cpu, join and shuffle operators will easly cause deadlock.

2018-01-30 Thread zhu.qing (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhu.qing updated FLINK-8526:

Description: 
The next program attached will stuck at some special parallelism in some 
situation. When parallelism is 80 in previous setting, The program will always 
stuck. And when parallelism is 100, everything goes well.  According to my 
research I found when the parallelism equals to number of taskslots. The 
program is not fastest and probably caused network buffer not enough. How 
networker buffer related to parallelism and  how parallelism relate to running 
task (In other words we have 160 taskslots but running task can be far more 
than taskslots). 

Parallelism cannot be equals to half of the cpu.

Or will casuse "java.io.FileNotFoundException". You can repeat exception on 
your pc and set your parallelism equals to half of your cpu core.

  was:
The next program attached will stuck at some special parallelism in some 
situation. When parallelism is 80 in previous setting, The program will always 
stuck. And when parallelism is 100, everything goes well.  According to my 
research I found when the parallelism equals to number of taskslots. The 
program is not fastest and probably caused network buffer not enough. How 
networker buffer related to parallelism and  how parallelism relate to running 
task (In other words we have 160 taskslots but running task can be far more 
than taskslots). 

Parallelism cannot be equals to half of the cpu.

Or will casuse "java.io.FileNotFoundException"


> When use parallelism equals to half of the number of cpu, join and shuffle 
> operators will easly cause deadlock.
> ---
>
> Key: FLINK-8526
> URL: https://issues.apache.org/jira/browse/FLINK-8526
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Java API, Local Runtime
>Affects Versions: 1.4.0
> Environment: 8 machines(96GB and 24 cores)  and 20 taskslot per 
> taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my 
> code in standalone mode. 
>Reporter: zhu.qing
>Priority: Major
> Attachments: T2AdjActiveV.java, T2AdjMessage.java
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> The next program attached will stuck at some special parallelism in some 
> situation. When parallelism is 80 in previous setting, The program will 
> always stuck. And when parallelism is 100, everything goes well.  According 
> to my research I found when the parallelism equals to number of taskslots. 
> The program is not fastest and probably caused network buffer not enough. How 
> networker buffer related to parallelism and  how parallelism relate to 
> running task (In other words we have 160 taskslots but running task can be 
> far more than taskslots). 
> Parallelism cannot be equals to half of the cpu.
> Or will casuse "java.io.FileNotFoundException". You can repeat exception on 
> your pc and set your parallelism equals to half of your cpu core.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the number of cpu, join and shuffle operators will easly cause deadlock.

2018-01-30 Thread zhu.qing (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhu.qing updated FLINK-8526:

Description: 
The next program attached will stuck at some special parallelism in some 
situation. When parallelism is 80 in previous setting, The program will always 
stuck. And when parallelism is 100, everything goes well.  According to my 
research I found when the parallelism equals to number of taskslots. The 
program is not fastest and probably caused network buffer not enough. How 
networker buffer related to parallelism and  how parallelism relate to running 
task (In other words we have 160 taskslots but running task can be far more 
than taskslots). 

Parallelism cannot be equals to half of the cpu.

Or will casuse "java.io.FileNotFoundException"

  was:The next program attached will stuck at some special parallelism in some 
situation. When parallelism is 80 in previous setting, The program will always 
stuck. And when parallelism is 100, everything goes well.  According to my 
research I found when the parallelism equals to number of taskslots. The 
program is not fastest and probably caused network buffer not enough. How 
networker buffer related to parallelism and  how parallelism relate to running 
task (In other words we have 160 taskslots but running task can be far more 
than taskslots). 


> When use parallelism equals to half of the number of cpu, join and shuffle 
> operators will easly cause deadlock.
> ---
>
> Key: FLINK-8526
> URL: https://issues.apache.org/jira/browse/FLINK-8526
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Java API, Local Runtime
>Affects Versions: 1.4.0
> Environment: 8 machines(96GB and 24 cores)  and 20 taskslot per 
> taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my 
> code in standalone mode. 
>Reporter: zhu.qing
>Priority: Major
> Attachments: T2AdjActiveV.java, T2AdjMessage.java
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> The next program attached will stuck at some special parallelism in some 
> situation. When parallelism is 80 in previous setting, The program will 
> always stuck. And when parallelism is 100, everything goes well.  According 
> to my research I found when the parallelism equals to number of taskslots. 
> The program is not fastest and probably caused network buffer not enough. How 
> networker buffer related to parallelism and  how parallelism relate to 
> running task (In other words we have 160 taskslots but running task can be 
> far more than taskslots). 
> Parallelism cannot be equals to half of the cpu.
> Or will casuse "java.io.FileNotFoundException"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the taskslot, join and shuffle operators will easly cause deadlock.

2018-01-30 Thread zhu.qing (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhu.qing updated FLINK-8526:

Description: The next program attached will stuck at some special 
parallelism in some situation. When parallelism is 80 in previous setting, The 
program will always stuck. And when parallelism is 100, everything goes well.  
According to my research I found when the parallelism equals to number of 
taskslots. The program is not fastest and probably caused network buffer not 
enough. How networker buffer related to parallelism and  how parallelism relate 
to running task (In other words we have 160 taskslots but running task can be 
far more than taskslots).   (was: The next program attached will stuck at some 
special parallelism in some situation. When parallelism is 80 in previous 
setting, The program will always stuck. And when parallelism is 100, everything 
goes well.  According to my research I found when the parallelism equals to 
number of taskslots. The program is not fastest and probably caused network 
buffer not enough. How networker buffer related to parallelism and  how 
parallelism relate to running task (In other words we have 160 taskslots but 
running task can be far more than taskslots).)

> When use parallelism equals to half of the taskslot, join and shuffle 
> operators will easly cause deadlock.
> --
>
> Key: FLINK-8526
> URL: https://issues.apache.org/jira/browse/FLINK-8526
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Java API, Local Runtime
>Affects Versions: 1.4.0
> Environment: 8 machines(96GB and 24 cores)  and 20 taskslot per 
> taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my 
> code in standalone mode. 
>Reporter: zhu.qing
>Priority: Major
> Attachments: T2AdjActiveV.java, T2AdjMessage.java
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> The next program attached will stuck at some special parallelism in some 
> situation. When parallelism is 80 in previous setting, The program will 
> always stuck. And when parallelism is 100, everything goes well.  According 
> to my research I found when the parallelism equals to number of taskslots. 
> The program is not fastest and probably caused network buffer not enough. How 
> networker buffer related to parallelism and  how parallelism relate to 
> running task (In other words we have 160 taskslots but running task can be 
> far more than taskslots). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-30 Thread yanxiaobin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanxiaobin updated FLINK-8500:
--
Attachment: image-2018-01-31-10-48-59-633.png

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the number of cpu, join and shuffle operators will easly cause deadlock.

2018-01-30 Thread zhu.qing (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhu.qing updated FLINK-8526:

Summary: When use parallelism equals to half of the number of cpu, join and 
shuffle operators will easly cause deadlock.  (was: When use parallelism equals 
to half of the taskslot, join and shuffle operators will easly cause deadlock.)

> When use parallelism equals to half of the number of cpu, join and shuffle 
> operators will easly cause deadlock.
> ---
>
> Key: FLINK-8526
> URL: https://issues.apache.org/jira/browse/FLINK-8526
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Java API, Local Runtime
>Affects Versions: 1.4.0
> Environment: 8 machines(96GB and 24 cores)  and 20 taskslot per 
> taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my 
> code in standalone mode. 
>Reporter: zhu.qing
>Priority: Major
> Attachments: T2AdjActiveV.java, T2AdjMessage.java
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> The next program attached will stuck at some special parallelism in some 
> situation. When parallelism is 80 in previous setting, The program will 
> always stuck. And when parallelism is 100, everything goes well.  According 
> to my research I found when the parallelism equals to number of taskslots. 
> The program is not fastest and probably caused network buffer not enough. How 
> networker buffer related to parallelism and  how parallelism relate to 
> running task (In other words we have 160 taskslots but running task can be 
> far more than taskslots). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the taskslot, join and shuffle operators will easly cause deadlock.

2018-01-30 Thread zhu.qing (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhu.qing updated FLINK-8526:

Summary: When use parallelism equals to half of the taskslot, join and 
shuffle operators will easly cause deadlock.  (was: When use parallelism equals 
to half of the taskslot, join and shuffle operators will easly caused deadlock.)

> When use parallelism equals to half of the taskslot, join and shuffle 
> operators will easly cause deadlock.
> --
>
> Key: FLINK-8526
> URL: https://issues.apache.org/jira/browse/FLINK-8526
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Java API, Local Runtime
>Affects Versions: 1.4.0
> Environment: 8 machines(96GB and 24 cores)  and 20 taskslot per 
> taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my 
> code in standalone mode. 
>Reporter: zhu.qing
>Priority: Major
> Attachments: T2AdjActiveV.java, T2AdjMessage.java
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> The next program attached will stuck at some special parallelism in some 
> situation. When parallelism is 80 in previous setting, The program will 
> always stuck. And when parallelism is 100, everything goes well.  According 
> to my research I found when the parallelism equals to number of taskslots. 
> The program is not fastest and probably caused network buffer not enough. How 
> networker buffer related to parallelism and  how parallelism relate to 
> running task (In other words we have 160 taskslots but running task can be 
> far more than taskslots).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the taskslot, join and shuffle operators will easly caused deadlock.

2018-01-30 Thread zhu.qing (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhu.qing updated FLINK-8526:

Summary: When use parallelism equals to half of the taskslot, join and 
shuffle operators will easly caused deadlock.  (was: When use parallelism 
equals to half of the taskslot, join and shuffle operator will easly caused 
deadlock.)

> When use parallelism equals to half of the taskslot, join and shuffle 
> operators will easly caused deadlock.
> ---
>
> Key: FLINK-8526
> URL: https://issues.apache.org/jira/browse/FLINK-8526
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Java API, Local Runtime
>Affects Versions: 1.4.0
> Environment: 8 machines(96GB and 24 cores)  and 20 taskslot per 
> taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my 
> code in standalone mode. 
>Reporter: zhu.qing
>Priority: Major
> Attachments: T2AdjActiveV.java, T2AdjMessage.java
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> The next program attached will stuck at some special parallelism in some 
> situation. When parallelism is 80 in previous setting, The program will 
> always stuck. And when parallelism is 100, everything goes well.  According 
> to my research I found when the parallelism equals to number of taskslots. 
> The program is not fastest and probably caused network buffer not enough. How 
> networker buffer related to parallelism and  how parallelism relate to 
> running task (In other words we have 160 taskslots but running task can be 
> far more than taskslots).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8526) When use parallelism equals to half of the taskslot, join and shuffle operator will easly caused deadlock.

2018-01-30 Thread zhu.qing (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhu.qing updated FLINK-8526:

Summary: When use parallelism equals to half of the taskslot, join and 
shuffle operator will easly caused deadlock.  (was: When use some parallelism, 
the program will stuck in some setting. )

> When use parallelism equals to half of the taskslot, join and shuffle 
> operator will easly caused deadlock.
> --
>
> Key: FLINK-8526
> URL: https://issues.apache.org/jira/browse/FLINK-8526
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, Java API, Local Runtime
>Affects Versions: 1.4.0
> Environment: 8 machines(96GB and 24 cores)  and 20 taskslot per 
> taskmanager. twitter-2010 dataset. And parallelism setting to 80. I run my 
> code in standalone mode. 
>Reporter: zhu.qing
>Priority: Major
> Attachments: T2AdjActiveV.java, T2AdjMessage.java
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> The next program attached will stuck at some special parallelism in some 
> situation. When parallelism is 80 in previous setting, The program will 
> always stuck. And when parallelism is 100, everything goes well.  According 
> to my research I found when the parallelism equals to number of taskslots. 
> The program is not fastest and probably caused network buffer not enough. How 
> networker buffer related to parallelism and  how parallelism relate to 
> running task (In other words we have 160 taskslots but running task can be 
> far more than taskslots).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8357) enable rolling in default log settings

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346161#comment-16346161
 ] 

ASF GitHub Bot commented on FLINK-8357:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5371#discussion_r164940108
  
--- Diff: flink-dist/src/main/flink-bin/conf/log4j.properties ---
@@ -31,10 +31,11 @@ log4j.logger.org.apache.hadoop=INFO
 log4j.logger.org.apache.zookeeper=INFO
 
 # Log all infos in the given file
-log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.file.file=${log.file}
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.DatePattern=.-MM-dd
--- End diff --

Will add soon.


> enable rolling in default log settings
> --
>
> Key: FLINK-8357
> URL: https://issues.apache.org/jira/browse/FLINK-8357
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Reporter: Xu Mingmin
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The release packages uses {{org.apache.log4j.FileAppender}} for log4j and 
> {{ch.qos.logback.core.FileAppender}} for logback, which could results in very 
> large log files. 
> For most cases, if not all, we need to enable rotation in a production 
> cluster, and I suppose it's a good idea to make rotation as default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8357) enable rolling in default log settings

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346159#comment-16346159
 ] 

ASF GitHub Bot commented on FLINK-8357:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5371#discussion_r164940066
  
--- Diff: flink-dist/src/main/flink-bin/conf/log4j-cli.properties ---
@@ -19,10 +19,11 @@
 log4j.rootLogger=INFO, file
 
 # Log all infos in the given file
-log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.file.file=${log.file}
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.DatePattern=.-MM-dd
--- End diff --

Will add soon.


> enable rolling in default log settings
> --
>
> Key: FLINK-8357
> URL: https://issues.apache.org/jira/browse/FLINK-8357
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Reporter: Xu Mingmin
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The release packages uses {{org.apache.log4j.FileAppender}} for log4j and 
> {{ch.qos.logback.core.FileAppender}} for logback, which could results in very 
> large log files. 
> For most cases, if not all, we need to enable rotation in a production 
> cluster, and I suppose it's a good idea to make rotation as default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8357) enable rolling in default log settings

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346162#comment-16346162
 ] 

ASF GitHub Bot commented on FLINK-8357:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5371#discussion_r164940131
  
--- Diff: flink-dist/src/main/flink-bin/conf/logback-yarn.xml ---
@@ -17,7 +17,7 @@
   -->
 
 
-
+
--- End diff --

Will add soon.


> enable rolling in default log settings
> --
>
> Key: FLINK-8357
> URL: https://issues.apache.org/jira/browse/FLINK-8357
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Reporter: Xu Mingmin
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The release packages uses {{org.apache.log4j.FileAppender}} for log4j and 
> {{ch.qos.logback.core.FileAppender}} for logback, which could results in very 
> large log files. 
> For most cases, if not all, we need to enable rotation in a production 
> cluster, and I suppose it's a good idea to make rotation as default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5371: [FLINK-8357] [conf] Enable rolling in default log ...

2018-01-30 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5371#discussion_r164940131
  
--- Diff: flink-dist/src/main/flink-bin/conf/logback-yarn.xml ---
@@ -17,7 +17,7 @@
   -->
 
 
-
+
--- End diff --

Will add soon.


---


[GitHub] flink pull request #5371: [FLINK-8357] [conf] Enable rolling in default log ...

2018-01-30 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5371#discussion_r164940108
  
--- Diff: flink-dist/src/main/flink-bin/conf/log4j.properties ---
@@ -31,10 +31,11 @@ log4j.logger.org.apache.hadoop=INFO
 log4j.logger.org.apache.zookeeper=INFO
 
 # Log all infos in the given file
-log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.file.file=${log.file}
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.DatePattern=.-MM-dd
--- End diff --

Will add soon.


---


[GitHub] flink pull request #5371: [FLINK-8357] [conf] Enable rolling in default log ...

2018-01-30 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5371#discussion_r164940066
  
--- Diff: flink-dist/src/main/flink-bin/conf/log4j-cli.properties ---
@@ -19,10 +19,11 @@
 log4j.rootLogger=INFO, file
 
 # Log all infos in the given file
-log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.file.file=${log.file}
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.DatePattern=.-MM-dd
--- End diff --

Will add soon.


---


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346124#comment-16346124
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise closed the pull request at:

https://github.com/apache/flink/pull/5393


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-30 Thread tweise
Github user tweise closed the pull request at:

https://github.com/apache/flink/pull/5393


---


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346121#comment-16346121
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

GitHub user tweise opened a pull request:

https://github.com/apache/flink/pull/5393

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer




*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the 

[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-30 Thread tweise
GitHub user tweise opened a pull request:

https://github.com/apache/flink/pull/5393

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer




*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink FLINK-8516.shardHashing

Alternatively you can review 

[jira] [Updated] (FLINK-7775) Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs

2018-01-30 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7775:
--
Description: 
{code}
  public int getNumberOfCachedJobs() {
return jobRefCounters.size();
  }
{code}

The method of PermanentBlobCache is not used.
We should remove it.

  was:
{code}
  public int getNumberOfCachedJobs() {
return jobRefCounters.size();
  }
{code}
The method of PermanentBlobCache is not used.
We should remove it.


> Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs
> ---
>
> Key: FLINK-7775
> URL: https://issues.apache.org/jira/browse/FLINK-7775
> Project: Flink
>  Issue Type: Task
>  Components: Local Runtime
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public int getNumberOfCachedJobs() {
> return jobRefCounters.size();
>   }
> {code}
> The method of PermanentBlobCache is not used.
> We should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7917) The return of taskInformationOrBlobKey should be placed inside synchronized in ExecutionJobVertex

2018-01-30 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7917:
--
Component/s: Local Runtime

> The return of taskInformationOrBlobKey should be placed inside synchronized 
> in ExecutionJobVertex
> -
>
> Key: FLINK-7917
> URL: https://issues.apache.org/jira/browse/FLINK-7917
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Ted Yu
>Priority: Minor
>
> Currently in ExecutionJobVertex#getTaskInformationOrBlobKey:
> {code}
> }
> return taskInformationOrBlobKey;
> {code}
> The return should be placed inside synchronized block.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration (State TTL)

2018-01-30 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345958#comment-16345958
 ] 

Bowen Li commented on FLINK-3089:
-

I wrote [this brief design 
doc|https://docs.google.com/document/d/1qiFqtCC80n4oFmmfxBWXrCd37mYKcuureyEr_nPAvSo/edit?usp=sharing].
 Can you guys please take a look? [~aljoscha] [~srichter] [~sihuazhou] 
[~xfournet]

What's the next step? Shall we draft a FLIP?

> State API Should Support Data Expiration (State TTL)
> 
>
> Key: FLINK-3089
> URL: https://issues.apache.org/jira/browse/FLINK-3089
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Niels Basjes
>Assignee: Bowen Li
>Priority: Major
>
> In some usecases (webanalytics) there is a need to have a state per visitor 
> on a website (i.e. keyBy(sessionid) ).
> At some point the visitor simply leaves and no longer creates new events (so 
> a special 'end of session' event will not occur).
> The only way to determine that a visitor has left is by choosing a timeout, 
> like "After 30 minutes no events we consider the visitor 'gone'".
> Only after this (chosen) timeout has expired should we discard this state.
> In the Trigger part of Windows we can set a timer and close/discard this kind 
> of information. But that introduces the buffering effect of the window (which 
> in some scenarios is unwanted).
> What I would like is to be able to set a timeout on a specific state which I 
> can update afterwards.
> This makes it possible to create a map function that assigns the right value 
> and that discards the state automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7795) Utilize error-prone to discover common coding mistakes

2018-01-30 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345955#comment-16345955
 ] 

Ted Yu commented on FLINK-7795:
---

error-prone has JDK 8 dependency.

> Utilize error-prone to discover common coding mistakes
> --
>
> Key: FLINK-7795
> URL: https://issues.apache.org/jira/browse/FLINK-7795
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Priority: Major
>
> http://errorprone.info/ is a tool which detects common coding mistakes.
> We should incorporate into Flink build process.
> Here are the dependencies:
> {code}
> 
>   com.google.errorprone
>   error_prone_annotation
>   ${error-prone.version}
>   provided
> 
> 
>   
>   com.google.auto.service
>   auto-service
>   1.0-rc3
>   true
> 
> 
>   com.google.errorprone
>   error_prone_check_api
>   ${error-prone.version}
>   provided
>   
> 
>   com.google.code.findbugs
>   jsr305
> 
>   
> 
> 
>   com.google.errorprone
>   javac
>   9-dev-r4023-3
>   provided
> 
>   
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8037) Missing cast in integer arithmetic in TransactionalIdsGenerator#generateIdsToAbort

2018-01-30 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-8037:
--
Labels: kafka-connect  (was: )

> Missing cast in integer arithmetic in 
> TransactionalIdsGenerator#generateIdsToAbort
> --
>
> Key: FLINK-8037
> URL: https://issues.apache.org/jira/browse/FLINK-8037
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: kafka-connect
>
> {code}
>   public Set generateIdsToAbort() {
> Set idsToAbort = new HashSet<>();
> for (int i = 0; i < safeScaleDownFactor; i++) {
>   idsToAbort.addAll(generateIdsToUse(i * poolSize * 
> totalNumberOfSubtasks));
> {code}
> The operands are integers where generateIdsToUse() expects long parameter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8335) Upgrade hbase connector dependency to 1.4.0

2018-01-30 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345946#comment-16345946
 ] 

Ted Yu commented on FLINK-8335:
---

1.4.1 RC is being voted.

Once it passes, we can upgrade to 1.4.1

> Upgrade hbase connector dependency to 1.4.0
> ---
>
> Key: FLINK-8335
> URL: https://issues.apache.org/jira/browse/FLINK-8335
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Ted Yu
>Priority: Minor
>
> hbase 1.4.0 has been released.
> 1.4.0 shows speed improvement over previous 1.x releases.
> http://search-hadoop.com/m/HBase/YGbbBxedD1Mnm8t?subj=Re+VOTE+The+second+HBase+1+4+0+release+candidate+RC1+is+available
> This issue is to upgrade the dependency to 1.4.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8357) enable rolling in default log settings

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345905#comment-16345905
 ] 

ASF GitHub Bot commented on FLINK-8357:
---

Github user XuMingmin commented on a diff in the pull request:

https://github.com/apache/flink/pull/5371#discussion_r164895415
  
--- Diff: flink-dist/src/main/flink-bin/conf/log4j-cli.properties ---
@@ -19,10 +19,11 @@
 log4j.rootLogger=INFO, file
 
 # Log all infos in the given file
-log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.file.file=${log.file}
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.DatePattern=.-MM-dd
--- End diff --

add `log4j.appender.file.MaxFileSize` and  
`log4j.appender.file.MaxBackupIndex` to limit the total size of log files.


> enable rolling in default log settings
> --
>
> Key: FLINK-8357
> URL: https://issues.apache.org/jira/browse/FLINK-8357
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Reporter: Xu Mingmin
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The release packages uses {{org.apache.log4j.FileAppender}} for log4j and 
> {{ch.qos.logback.core.FileAppender}} for logback, which could results in very 
> large log files. 
> For most cases, if not all, we need to enable rotation in a production 
> cluster, and I suppose it's a good idea to make rotation as default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8357) enable rolling in default log settings

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345907#comment-16345907
 ] 

ASF GitHub Bot commented on FLINK-8357:
---

Github user XuMingmin commented on a diff in the pull request:

https://github.com/apache/flink/pull/5371#discussion_r164895963
  
--- Diff: flink-dist/src/main/flink-bin/conf/logback-yarn.xml ---
@@ -17,7 +17,7 @@
   -->
 
 
-
+
--- End diff --

add a `rollingPolicy` to limit the size of log files.


> enable rolling in default log settings
> --
>
> Key: FLINK-8357
> URL: https://issues.apache.org/jira/browse/FLINK-8357
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Reporter: Xu Mingmin
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The release packages uses {{org.apache.log4j.FileAppender}} for log4j and 
> {{ch.qos.logback.core.FileAppender}} for logback, which could results in very 
> large log files. 
> For most cases, if not all, we need to enable rotation in a production 
> cluster, and I suppose it's a good idea to make rotation as default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8357) enable rolling in default log settings

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345906#comment-16345906
 ] 

ASF GitHub Bot commented on FLINK-8357:
---

Github user XuMingmin commented on a diff in the pull request:

https://github.com/apache/flink/pull/5371#discussion_r164895503
  
--- Diff: flink-dist/src/main/flink-bin/conf/log4j.properties ---
@@ -31,10 +31,11 @@ log4j.logger.org.apache.hadoop=INFO
 log4j.logger.org.apache.zookeeper=INFO
 
 # Log all infos in the given file
-log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.file.file=${log.file}
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.DatePattern=.-MM-dd
--- End diff --

save as above


> enable rolling in default log settings
> --
>
> Key: FLINK-8357
> URL: https://issues.apache.org/jira/browse/FLINK-8357
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Reporter: Xu Mingmin
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The release packages uses {{org.apache.log4j.FileAppender}} for log4j and 
> {{ch.qos.logback.core.FileAppender}} for logback, which could results in very 
> large log files. 
> For most cases, if not all, we need to enable rotation in a production 
> cluster, and I suppose it's a good idea to make rotation as default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5371: [FLINK-8357] [conf] Enable rolling in default log ...

2018-01-30 Thread XuMingmin
Github user XuMingmin commented on a diff in the pull request:

https://github.com/apache/flink/pull/5371#discussion_r164895415
  
--- Diff: flink-dist/src/main/flink-bin/conf/log4j-cli.properties ---
@@ -19,10 +19,11 @@
 log4j.rootLogger=INFO, file
 
 # Log all infos in the given file
-log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.file.file=${log.file}
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.DatePattern=.-MM-dd
--- End diff --

add `log4j.appender.file.MaxFileSize` and  
`log4j.appender.file.MaxBackupIndex` to limit the total size of log files.


---


[GitHub] flink pull request #5371: [FLINK-8357] [conf] Enable rolling in default log ...

2018-01-30 Thread XuMingmin
Github user XuMingmin commented on a diff in the pull request:

https://github.com/apache/flink/pull/5371#discussion_r164895503
  
--- Diff: flink-dist/src/main/flink-bin/conf/log4j.properties ---
@@ -31,10 +31,11 @@ log4j.logger.org.apache.hadoop=INFO
 log4j.logger.org.apache.zookeeper=INFO
 
 # Log all infos in the given file
-log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.file.file=${log.file}
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.DatePattern=.-MM-dd
--- End diff --

save as above


---


[GitHub] flink pull request #5371: [FLINK-8357] [conf] Enable rolling in default log ...

2018-01-30 Thread XuMingmin
Github user XuMingmin commented on a diff in the pull request:

https://github.com/apache/flink/pull/5371#discussion_r164895963
  
--- Diff: flink-dist/src/main/flink-bin/conf/logback-yarn.xml ---
@@ -17,7 +17,7 @@
   -->
 
 
-
+
--- End diff --

add a `rollingPolicy` to limit the size of log files.


---


[jira] [Commented] (FLINK-8504) TaskExecutor does not properly deregisters JobManager from JobLeaderService

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345880#comment-16345880
 ] 

ASF GitHub Bot commented on FLINK-8504:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5361


> TaskExecutor does not properly deregisters JobManager from JobLeaderService
> ---
>
> Key: FLINK-8504
> URL: https://issues.apache.org/jira/browse/FLINK-8504
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{TaskExecutor}} should deregister jobs from the {{JobLeaderService}} 
> once it no longer holds slots for this job. The problem is that before 
> unregistering the job from the {{JobLeaderService}} in 
> {{TaskExecutor#freeInternal}}, the actual slot is freed which also removes 
> the {{JobID}} from the slot. Therefore, we lose the information to which job 
> the slot belonged. An easy solution would be to return a {{SlotInformation}} 
> object instead of the {{TaskSlot}} from {{TaskSlotTable#freeSlot}} which 
> contains the respective information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5361: [FLINK-8504] [flip6] Deregister jobs from the JobL...

2018-01-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5361


---


[jira] [Closed] (FLINK-8504) TaskExecutor does not properly deregisters JobManager from JobLeaderService

2018-01-30 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8504.

Resolution: Fixed

Fixed via e94a488dd78e7c2efdf55a67cea886ee15a641a6

> TaskExecutor does not properly deregisters JobManager from JobLeaderService
> ---
>
> Key: FLINK-8504
> URL: https://issues.apache.org/jira/browse/FLINK-8504
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{TaskExecutor}} should deregister jobs from the {{JobLeaderService}} 
> once it no longer holds slots for this job. The problem is that before 
> unregistering the job from the {{JobLeaderService}} in 
> {{TaskExecutor#freeInternal}}, the actual slot is freed which also removes 
> the {{JobID}} from the slot. Therefore, we lose the information to which job 
> the slot belonged. An easy solution would be to return a {{SlotInformation}} 
> object instead of the {{TaskSlot}} from {{TaskSlotTable#freeSlot}} which 
> contains the respective information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8531) Support separation of "Exclusive", "Shared" and "Task owned" state

2018-01-30 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8531:
---

 Summary: Support separation of "Exclusive", "Shared" and "Task 
owned" state
 Key: FLINK-8531
 URL: https://issues.apache.org/jira/browse/FLINK-8531
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.5.0


Currently, all state created at a certain checkpoint goes into the directory 
{{chk-id}}.

With incremental checkpointing, some state is shared across checkpoint and is 
referenced by newer checkpoints. That way, old {{chk-id}} directories stay 
around, containing some shared chunks. That makes it both for users and cleanup 
hooks hard to determine when a {{chk-x}} directory could be deleted.

The same holds for state that can only every be dropped by certain operators on 
the TaskManager, never by the JobManager / CheckpointCoordinator. Examples of 
that state are write ahead logs, which need to be retained until the move to 
the target system is complete, which may in some cases be later then when the 
checkpoint that created them is disposed.

I propose to introduce different scopes for tasks:
  - **EXCLUSIVE** is for state that belongs to one checkpoint only
  - **SHARED** is for state that is possibly part of multiple checkpoints
  - **TASKOWNED** is for state that must never by dropped by the JobManager.

For file based checkpoint targets, I propose that we have the following 
directory layout:
{code}
/user-defined-checkpoint-dir
|
+ --shared/
+ --taskowned/
+ --chk-1/
+ --chk-2/
+ --chk-3/
...
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-30 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5823.
-
Resolution: Fixed

Fixed via edc6f1000704a492629d7bdf8cbfa5ba5c45bb1f

> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2018-01-30 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5823.
---

> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8530) Enable detached job submission for RestClusterClient

2018-01-30 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8530:


 Summary: Enable detached job submission for RestClusterClient
 Key: FLINK-8530
 URL: https://issues.apache.org/jira/browse/FLINK-8530
 Project: Flink
  Issue Type: Improvement
  Components: Client
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


The {{RestClusterClient}} should also be able to submit jobs in detached mode. 
In detached mode, we don't wait for the {{JobExecutionResult}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8522) DefaultOperatorStateBackend writes data in checkpoint that is never read.

2018-01-30 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345395#comment-16345395
 ] 

Kostas Kloudas commented on FLINK-8522:
---

There is a commit for this here: [https://github.com/apache/flink/pull/5230]

It is the first commit of that PR.

> DefaultOperatorStateBackend writes data in checkpoint that is never read.
> -
>
> Key: FLINK-8522
> URL: https://issues.apache.org/jira/browse/FLINK-8522
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.4.1
>
>
> In the DefaultOperatorStateBackend at line 283 we write in the checkpoint an 
> int declaring the number of the operator states that we include in the 
> checkpoint. 
> This number is never read when restoring and this can lead to confusion and 
> problems with backwards compatibility and/or extension of the types of state 
> we support (e.g. broadcast state).
> There are two easy solutions, either remove the line and do not write the 
> size, or make sure that we also read this number when restoring and simply 
> ignore it.
> I would go for the first one, i.e. remove the line. What do you think 
> [~richtesn] and [~tzulitai] ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345394#comment-16345394
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5230
  
@aljoscha and @tzulitai ready for another review.


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5230: [FLINK-8345] Add iterator of keyed state on broadcast sid...

2018-01-30 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5230
  
@aljoscha and @tzulitai ready for another review.


---


[jira] [Updated] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-01-30 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8411:

Fix Version/s: (was: 1.4.1)

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-8411) HeapListState#add(null) will wipe out entire list state

2018-01-30 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-8411:
-

Reopen to change fixVersion because I'm reverting on 1.4.x because it is a 
semantical change that might break some user programs.

> HeapListState#add(null) will wipe out entire list state
> ---
>
> Key: FLINK-8411
> URL: https://issues.apache.org/jira/browse/FLINK-8411
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.5.0
>
>
> You can see that {{HeapListState#add(null)}} will result in the whole state 
> being cleared or wiped out. There's never a unit test for {{List#add(null)}} 
> in {{StateBackendTestBase}}
> {code:java}
> // HeapListState
> @Override
>   public void add(V value) {
>   final N namespace = currentNamespace;
>   if (value == null) {
>   clear();
>   return;
>   }
>   final StateTable map = stateTable;
>   ArrayList list = map.get(namespace);
>   if (list == null) {
>   list = new ArrayList<>();
>   map.put(namespace, list);
>   }
>   list.add(value);
>   }
> {code}
> {code:java}
> // RocksDBListState
> @Override
>   public void add(V value) throws IOException {
>   try {
>   writeCurrentKeyWithGroupAndNamespace();
>   byte[] key = keySerializationStream.toByteArray();
>   keySerializationStream.reset();
>   DataOutputViewStreamWrapper out = new 
> DataOutputViewStreamWrapper(keySerializationStream);
>   valueSerializer.serialize(value, out);
>   backend.db.merge(columnFamily, writeOptions, key, 
> keySerializationStream.toByteArray());
>   } catch (Exception e) {
>   throw new RuntimeException("Error while adding data to 
> RocksDB", e);
>   }
>   }
> {code}
> The fix should correct the behavior to be consistent between the two state 
> backends, as well as adding a unit test for {{ListState#add(null)}}. For the 
> correct behavior, I believe adding null with {{add(null)}} should simply be 
> ignored without any consequences.
> cc [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8407) Setting the parallelism after a partitioning operation should be forbidden

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345369#comment-16345369
 ] 

ASF GitHub Bot commented on FLINK-8407:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5369
  
Ah dammit, you are right that you we can't actually test that the method 
works correctly because there is no such method.

That was stupid, sorry for that!  


> Setting the parallelism after a partitioning operation should be forbidden
> --
>
> Key: FLINK-8407
> URL: https://issues.apache.org/jira/browse/FLINK-8407
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Major
>
> Partitioning operations ({{shuffle}}, {{rescale}}, etc.) for a {{DataStream}} 
> create new {{DataStreams}}, which allow the users to set parallelisms for 
> them. However, the {{PartitionTransformations}} in these returned 
> {{DataStreams}} will only add virtual nodes, whose parallelisms could not be 
> specified, in the execution graph. We should forbid users to set the 
> parallelism after a partitioning operation since they won't actually work. 
> Also the corresponding documents should be updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5369: [FLINK-8407][DataStream]Setting the parallelism after a p...

2018-01-30 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5369
  
Ah dammit, you are right that you we can't actually test that the method 
works correctly because there is no such method.

That was stupid, sorry for that! 😅 


---


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345340#comment-16345340
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5155
  
I like this a lot now!  

One last comment/idea I had is this: I don't like that `StreamTask` has 
`getInputWatermarkGauge()` for the only reason that we need it in the 
`OperatorChain` to set it on the head operator. Could this not be set at the 
end of `OneInputStreamTask.init()` the same way it is set for 
`TwoInputStreamTask.init()` (where we then also would have to set the 
min-input-watermark)?


> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5155: [FLINK-4812][metrics] Expose currentLowWatermark for all ...

2018-01-30 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5155
  
I like this a lot now! 👍 

One last comment/idea I had is this: I don't like that `StreamTask` has 
`getInputWatermarkGauge()` for the only reason that we need it in the 
`OperatorChain` to set it on the head operator. Could this not be set at the 
end of `OneInputStreamTask.init()` the same way it is set for 
`TwoInputStreamTask.init()` (where we then also would have to set the 
min-input-watermark)?


---


[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345247#comment-16345247
 ] 

ASF GitHub Bot commented on FLINK-7608:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5161#discussion_r164780848
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -194,14 +190,20 @@ public void setup(StreamTask containingTask, 
StreamConfig config, Output LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Major
> Fix For: 1.5.0
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5161: [FLINK-7608][metric] Refactor latency statistics m...

2018-01-30 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5161#discussion_r164780848
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -194,14 +190,20 @@ public void setup(StreamTask containingTask, 
StreamConfig config, Output

[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345237#comment-16345237
 ] 

ASF GitHub Bot commented on FLINK-7608:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5161
  
@yew1eb by `operatorName`, do you mean the `name()` or `uid()`, I think 
both of these could make sense.


> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Major
> Fix For: 1.5.0
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5161: [FLINK-7608][metric] Refactor latency statistics metric

2018-01-30 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5161
  
@yew1eb by `operatorName`, do you mean the `name()` or `uid()`, I think 
both of these could make sense.


---


[jira] [Commented] (FLINK-8525) Improve queryable state code examples

2018-01-30 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345234#comment-16345234
 ] 

Kostas Kloudas commented on FLINK-8525:
---

Yes, this is something that is not included in the documentation.

> Improve queryable state code examples
> -
>
> Key: FLINK-8525
> URL: https://issues.apache.org/jira/browse/FLINK-8525
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Queryable State
>Affects Versions: 1.4.0
>Reporter: jia liu
>Priority: Minor
>
> I't really very hard to understand the code in documents.
> {code:java}
> QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
> // the state descriptor of the state to be fetched.
> ValueStateDescriptor> descriptor =
> new ValueStateDescriptor<>(
>   "average",
>   TypeInformation.of(new TypeHint>() {}),
>   Tuple2.of(0L, 0L));
> CompletableFuture>> resultFuture =
> client.getKvState(jobId, "query-name", key, 
> BasicTypeInfo.LONG_TYPE_INFO, descriptor);
> // now handle the returned value
> resultFuture.thenAccept(response -> {
> try {
> Tuple2 res = response.get();
> } catch (Exception e) {
> e.printStackTrace();
> }
> });
> {code}
> I can't get the declaration of key and jobId from it. And the most important, 
> there is no test case in flink-queryable-state module.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8525) Improve queryable state code examples

2018-01-30 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345221#comment-16345221
 ] 

Chesnay Schepler commented on FLINK-8525:
-

One thing that should be shown is how to actually create a {{JobID}} from the 
ID shown in the UI, i.e. "{{JobID jobId = JobID.fromHexString("ABCDE");}}"

> Improve queryable state code examples
> -
>
> Key: FLINK-8525
> URL: https://issues.apache.org/jira/browse/FLINK-8525
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Queryable State
>Affects Versions: 1.4.0
>Reporter: jia liu
>Priority: Minor
>
> I't really very hard to understand the code in documents.
> {code:java}
> QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
> // the state descriptor of the state to be fetched.
> ValueStateDescriptor> descriptor =
> new ValueStateDescriptor<>(
>   "average",
>   TypeInformation.of(new TypeHint>() {}),
>   Tuple2.of(0L, 0L));
> CompletableFuture>> resultFuture =
> client.getKvState(jobId, "query-name", key, 
> BasicTypeInfo.LONG_TYPE_INFO, descriptor);
> // now handle the returned value
> resultFuture.thenAccept(response -> {
> try {
> Tuple2 res = response.get();
> } catch (Exception e) {
> e.printStackTrace();
> }
> });
> {code}
> I can't get the declaration of key and jobId from it. And the most important, 
> there is no test case in flink-queryable-state module.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345220#comment-16345220
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164776439
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164776439
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for 

[jira] [Closed] (FLINK-4765) Migrate ConfigConstants to ConfigOptions

2018-01-30 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-4765.
---
   Resolution: Fixed
Fix Version/s: 1.5.0

The majority of options have been ported, a few outliers are left and will be 
ported as part of FLINK-8475.

> Migrate ConfigConstants to ConfigOptions
> 
>
> Key: FLINK-4765
> URL: https://issues.apache.org/jira/browse/FLINK-4765
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue has multiple subtasks



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-4767) Migrate JobManager configuration options

2018-01-30 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-4767:
---

Assignee: (was: Chesnay Schepler)

> Migrate JobManager configuration options
> 
>
> Key: FLINK-4767
> URL: https://issues.apache.org/jira/browse/FLINK-4767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Local Runtime
>Reporter: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-4767) Migrate JobManager configuration options

2018-01-30 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-4767.
---
   Resolution: Fixed
Fix Version/s: 1.5.0

Was fixed in the mean-time.

> Migrate JobManager configuration options
> 
>
> Key: FLINK-4767
> URL: https://issues.apache.org/jira/browse/FLINK-4767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Local Runtime
>Reporter: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-4767) Migrate JobManager configuration options

2018-01-30 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-4767:
---

Assignee: Chesnay Schepler

> Migrate JobManager configuration options
> 
>
> Key: FLINK-4767
> URL: https://issues.apache.org/jira/browse/FLINK-4767
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Local Runtime
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345206#comment-16345206
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164773927
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+ 

[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164773927
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for 

[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345198#comment-16345198
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5392

[FLINK-8475][config][docs] Integrate JM options

## What is the purpose of the change

This PR integrates the JobManager `ConfigOptions` into the configuration 
docs generator.

## Brief change log

* Add missing descriptions to config options (derived from existing 
description/javadocs)
* integrate jobmanager configuration table into `config.md` and separate 
job- and taskmanager sections

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_jm

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5392.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5392


commit 61bb0eff965cd066edebc5e4a167dd9bd3a07f77
Author: zentol 
Date:   2018-01-22T16:32:38Z

[FLINK-8475][config][docs] Integrate JM options




> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5392: [FLINK-8475][config][docs] Integrate JM options

2018-01-30 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5392

[FLINK-8475][config][docs] Integrate JM options

## What is the purpose of the change

This PR integrates the JobManager `ConfigOptions` into the configuration 
docs generator.

## Brief change log

* Add missing descriptions to config options (derived from existing 
description/javadocs)
* integrate jobmanager configuration table into `config.md` and separate 
job- and taskmanager sections

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_jm

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5392.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5392


commit 61bb0eff965cd066edebc5e4a167dd9bd3a07f77
Author: zentol 
Date:   2018-01-22T16:32:38Z

[FLINK-8475][config][docs] Integrate JM options




---


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345185#comment-16345185
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5391

[FLINK-8475][config][docs] Integrate BlobServer options

## What is the purpose of the change

This PR adds the BlobServer `ConfigOptions` to the full configuration 
reference.

## Brief change log

* Add missing descriptions to config options (derived from existing 
description/javadocs)
* integrate BlobServer  configuration table into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_blob

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5391.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5391


commit 4646481d5c398ee27c2c1600eb23e10009074e20
Author: zentol 
Date:   2018-01-23T13:44:00Z

[FLINK-8475][config][docs] Integrate BlobServer options




> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5391: [FLINK-8475][config][docs] Integrate BlobServer op...

2018-01-30 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5391

[FLINK-8475][config][docs] Integrate BlobServer options

## What is the purpose of the change

This PR adds the BlobServer `ConfigOptions` to the full configuration 
reference.

## Brief change log

* Add missing descriptions to config options (derived from existing 
description/javadocs)
* integrate BlobServer  configuration table into `config.md`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8475_blob

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5391.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5391


commit 4646481d5c398ee27c2c1600eb23e10009074e20
Author: zentol 
Date:   2018-01-23T13:44:00Z

[FLINK-8475][config][docs] Integrate BlobServer options




---


[jira] [Commented] (FLINK-8476) ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT unused

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345170#comment-16345170
 ] 

ASF GitHub Bot commented on FLINK-8476:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5338


> ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT unused
> --
>
> Key: FLINK-8476
> URL: https://issues.apache.org/jira/browse/FLINK-8476
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> {{ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT}} is unused and should probably 
> be deprecated.
> [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-6623) BlobCacheSuccessTest fails on Windows

2018-01-30 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-6623.
---
   Resolution: Fixed
Fix Version/s: 1.5.0

master: e135c3a5e2a42911f4c7f744003f5804c25d2dd9

> BlobCacheSuccessTest fails on Windows
> -
>
> Key: FLINK-6623
> URL: https://issues.apache.org/jira/browse/FLINK-6623
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0, 1.5.0
> Environment: windows 10, java 1.8
>Reporter: constantine stanley
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> All tests in {{BlobCacheSuccessTest}} fail on Windows.
> {code}
> java.nio.file.FileSystemException: 
> C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\incoming\temp-
>  -> 
> C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\job_a8fef824a8e43a546dfa05d0c8b73e57\blob_p-0ae4f711ef5d6e9d26c611fd2c8c8ac45ecbf9e7-cd525d0173571dc24f4c0723130892af:
>  The process cannot access the file because it is being used by another 
> process.
>   at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>   at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
>   at 
> sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
>   at java.nio.file.Files.move(Files.java:1395)
>   at 
> org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:464)
>   at 
> org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:708)
>   at 
> org.apache.flink.runtime.blob.BlobServer.putBuffer(BlobServer.java:608)
>   at 
> org.apache.flink.runtime.blob.BlobServer.putPermanent(BlobServer.java:568)
>   at 
> org.apache.flink.runtime.blob.BlobServerPutTest.put(BlobServerPutTest.java:778)
>   at 
> org.apache.flink.runtime.blob.BlobCacheSuccessTest.uploadFileGetTest(BlobCacheSuccessTest.java:173)
>   at 
> org.apache.flink.runtime.blob.BlobCacheSuccessTest.testBlobForJobCacheHa(BlobCacheSuccessTest.java:90)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345166#comment-16345166
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164766198
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   

[GitHub] flink pull request #5351: [FLINK-6623][Blob] BlobServer#putBuffer moves file...

2018-01-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5351


---


[jira] [Commented] (FLINK-8494) Migrate CC#DEFAULT_PARALLELISM_KEY

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345172#comment-16345172
 ] 

ASF GitHub Bot commented on FLINK-8494:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5377


> Migrate CC#DEFAULT_PARALLELISM_KEY
> --
>
> Key: FLINK-8494
> URL: https://issues.apache.org/jira/browse/FLINK-8494
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {{ConfigConstants#DEFAULT_PARALLELISM_KEY}} was only partialy migrated to a 
> ConfigOption in FLINK-4770.
> A ConfigOption was createde but the usages weren't replaced. The option is 
> also called {{DEFAULT_PARALLELISM_KEY}} when it should be called 
> {{DEFAULT_PARALLELISM}} to be consistent with all other config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6623) BlobCacheSuccessTest fails on Windows

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345171#comment-16345171
 ] 

ASF GitHub Bot commented on FLINK-6623:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5351


> BlobCacheSuccessTest fails on Windows
> -
>
> Key: FLINK-6623
> URL: https://issues.apache.org/jira/browse/FLINK-6623
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0, 1.5.0
> Environment: windows 10, java 1.8
>Reporter: constantine stanley
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> All tests in {{BlobCacheSuccessTest}} fail on Windows.
> {code}
> java.nio.file.FileSystemException: 
> C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\incoming\temp-
>  -> 
> C:\Users\Zento\AppData\Local\Temp\junit1683251984771313204\junit8061309906182960047\blobStore-04a01d13-96c8-4c0d-b209-5ea0bf5e534d\job_a8fef824a8e43a546dfa05d0c8b73e57\blob_p-0ae4f711ef5d6e9d26c611fd2c8c8ac45ecbf9e7-cd525d0173571dc24f4c0723130892af:
>  The process cannot access the file because it is being used by another 
> process.
>   at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>   at 
> sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>   at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
>   at 
> sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
>   at java.nio.file.Files.move(Files.java:1395)
>   at 
> org.apache.flink.runtime.blob.BlobUtils.moveTempFileToStore(BlobUtils.java:464)
>   at 
> org.apache.flink.runtime.blob.BlobServer.moveTempFileToStore(BlobServer.java:708)
>   at 
> org.apache.flink.runtime.blob.BlobServer.putBuffer(BlobServer.java:608)
>   at 
> org.apache.flink.runtime.blob.BlobServer.putPermanent(BlobServer.java:568)
>   at 
> org.apache.flink.runtime.blob.BlobServerPutTest.put(BlobServerPutTest.java:778)
>   at 
> org.apache.flink.runtime.blob.BlobCacheSuccessTest.uploadFileGetTest(BlobCacheSuccessTest.java:173)
>   at 
> org.apache.flink.runtime.blob.BlobCacheSuccessTest.testBlobForJobCacheHa(BlobCacheSuccessTest.java:90)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-6464) Metric name is not stable

2018-01-30 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-6464.
---
   Resolution: Fixed
Fix Version/s: (was: 1.4.1)

master: 7d4bd4b6d6710a7e81a7883aee1947a76d564b9a

> Metric name is not stable
> -
>
> Key: FLINK-6464
> URL: https://issues.apache.org/jira/browse/FLINK-6464
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Metrics
>Affects Versions: 1.2.0
>Reporter: Andrey
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> Currently according to the documentation 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html)
>  operator metrics constructed using the following pattern:
> , 
> For some operators, "operator_name" could contain default implementation of 
> toString method. For example:
> {code}
> TriggerWindow(TumblingProcessingTimeWindows(3000), 
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@c65792d4},
>  xxx.Trigger@665fe457, WindowedStream.apply(WindowedStream.java:521)) -> 
> Sink: Unnamed
> {code}
> The part "@c65792d4" will be changed every time job is restarted/cancelled. 
> As a consequence it's not possible to store metrics for a long time.
> Expected:
> * ensure all operators return human readable, non-default names OR
> * change the way TriggerWindow generates it's name.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6464) Metric name is not stable

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345169#comment-16345169
 ] 

ASF GitHub Bot commented on FLINK-6464:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5332


> Metric name is not stable
> -
>
> Key: FLINK-6464
> URL: https://issues.apache.org/jira/browse/FLINK-6464
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Metrics
>Affects Versions: 1.2.0
>Reporter: Andrey
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> Currently according to the documentation 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html)
>  operator metrics constructed using the following pattern:
> , 
> For some operators, "operator_name" could contain default implementation of 
> toString method. For example:
> {code}
> TriggerWindow(TumblingProcessingTimeWindows(3000), 
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@c65792d4},
>  xxx.Trigger@665fe457, WindowedStream.apply(WindowedStream.java:521)) -> 
> Sink: Unnamed
> {code}
> The part "@c65792d4" will be changed every time job is restarted/cancelled. 
> As a consequence it's not possible to store metrics for a long time.
> Expected:
> * ensure all operators return human readable, non-default names OR
> * change the way TriggerWindow generates it's name.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5338: [FLINK-8476][config][HA] Deprecate HA config const...

2018-01-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5338


---


[jira] [Closed] (FLINK-5659) FileBaseUtils#deleteFileOrDirectory not thread-safe on Windows

2018-01-30 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-5659.
---
   Resolution: Fixed
Fix Version/s: 1.5.0

master: 2e63d5a8ec1fa874a61061b72b970879f14c86d9

> FileBaseUtils#deleteFileOrDirectory not thread-safe on Windows
> --
>
> Key: FLINK-5659
> URL: https://issues.apache.org/jira/browse/FLINK-5659
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Local Runtime
>Affects Versions: 1.2.0, 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> The {code}FileBaseUtils#deleteFileOrDirectory{code} is not thread-safe on 
> Windows.
> First you will run into AccessDeniedExceptions since one thread tried to 
> delete a file while another thread was already doing that, for which the file 
> has to be opened.
> Once you resolve those exceptions (by catching them double checking whether 
> the file still exists), you run into DirectoryNotEmptyExceptions since there 
> is some wacky timing/visibility issue when deleting files concurrently.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5377: [FLINK-8494][config] Migrate CC#DEFAULT_PARALLELIS...

2018-01-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5377


---


[GitHub] flink pull request #5332: [FLINK-6464][streaming] Stabilize default window o...

2018-01-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5332


---


[jira] [Closed] (FLINK-8476) ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT unused

2018-01-30 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8476.
---
Resolution: Fixed

master: 83eb8e143ef64e56974a334ca96e10c011f4a32c

> ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT unused
> --
>
> Key: FLINK-8476
> URL: https://issues.apache.org/jira/browse/FLINK-8476
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> {{ConfigConstants#DEFAULT_HA_JOB_MANAGER_PORT}} is unused and should probably 
> be deprecated.
> [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5342: [FLINK-8479] Timebounded stream join

2018-01-30 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5342#discussion_r164766198
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+
+// TODO: Make bucket granularity adaptable
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link JoinedProcessFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the user-defined function
+ */
+public class TimeBoundedStreamJoinOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator {
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final long inverseLowerBound;
+   private final long inverseUpperBound;
+
+   private final boolean lowerBoundInclusive;
+   private final boolean upperBoundInclusive;
+
+   private final long bucketGranularity = 1;
+
+   private static final String LEFT_BUFFER = "LEFT_BUFFER";
+   private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+   private static final String LAST_CLEANUP_LEFT = "LAST_CLEANUP_LEFT";
+   private static final String LAST_CLEANUP_RIGHT = "LAST_CLEANUP_RIGHT";
+
+   private transient ValueState lastCleanupRightBuffer;
+   private transient ValueState lastCleanupLeftBuffer;
+
+   private transient MapState>> 
leftBuffer;
+   private transient MapState>> 
rightBuffer;
+
+   private final TypeSerializer leftTypeSerializer;
+   private final TypeSerializer rightTypeSerializer;
+
+   private transient TimestampedCollector collector;
+
+   private ContextImpl context;
+
+   /**
+* Creates a new TimeBoundedStreamJoinOperator.
+*
+* @param lowerBound  The lower bound for evaluating if 
elements should be joined
+* @param upperBound  The upper bound for 

[jira] [Closed] (FLINK-8494) Migrate CC#DEFAULT_PARALLELISM_KEY

2018-01-30 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8494.
---
Resolution: Fixed

master: 0e20b613087e1b326e05674e3d532ea4aa444bc3

> Migrate CC#DEFAULT_PARALLELISM_KEY
> --
>
> Key: FLINK-8494
> URL: https://issues.apache.org/jira/browse/FLINK-8494
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {{ConfigConstants#DEFAULT_PARALLELISM_KEY}} was only partialy migrated to a 
> ConfigOption in FLINK-4770.
> A ConfigOption was createde but the usages weren't replaced. The option is 
> also called {{DEFAULT_PARALLELISM_KEY}} when it should be called 
> {{DEFAULT_PARALLELISM}} to be consistent with all other config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8407) Setting the parallelism after a partitioning operation should be forbidden

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345158#comment-16345158
 ] 

ASF GitHub Bot commented on FLINK-8407:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5369
  
That makes sense to me. I just wonder what do you mean by "add a test for 
the Java API"...


> Setting the parallelism after a partitioning operation should be forbidden
> --
>
> Key: FLINK-8407
> URL: https://issues.apache.org/jira/browse/FLINK-8407
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Major
>
> Partitioning operations ({{shuffle}}, {{rescale}}, etc.) for a {{DataStream}} 
> create new {{DataStreams}}, which allow the users to set parallelisms for 
> them. However, the {{PartitionTransformations}} in these returned 
> {{DataStreams}} will only add virtual nodes, whose parallelisms could not be 
> specified, in the execution graph. We should forbid users to set the 
> parallelism after a partitioning operation since they won't actually work. 
> Also the corresponding documents should be updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-01-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345155#comment-16345155
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5384
  
More reason for the more advanced configuration stuff where you can 
actually specify such things directly in the configuration code.  

Changes look good now, though.  


> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5369: [FLINK-8407][DataStream]Setting the parallelism after a p...

2018-01-30 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5369
  
That makes sense to me. I just wonder what do you mean by "add a test for 
the Java API"...


---


  1   2   3   >