[jira] [Created] (FLINK-7597) broken flink-connectors-kinesis setup in Intellij that potentially results from improper pom.xml

2017-09-06 Thread Bowen Li (JIRA)
Bowen Li created FLINK-7597:
---

 Summary: broken flink-connectors-kinesis setup in Intellij that 
potentially results from improper pom.xml
 Key: FLINK-7597
 URL: https://issues.apache.org/jira/browse/FLINK-7597
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Affects Versions: 1.3.2, 1.4.0
Reporter: Bowen Li
Assignee: Bowen Li


I use Intellij to develop flink and flink-connectors-kinesis. I imported the 
whole flink src code into Intellij, and Intellij treats 
flink-connectors-kinesis as a module. The project structure in intellij looks 
like this: https://imgur.com/a/uK3Fd

Here's the problem: The {{flink-connectors-kinesis}} module always complains 
about not being able to find dependencies like amazon-kinesis-producer, 
amazon-kinesis-client, flink-streaming-java_2.11, etc. Seems like Intellij 
cannot properly parse {{/flink-connectors-kinesis/pom.xml}}. And Intellij 
always suggest I add those dependencies to {{flink-connectors/pom.xml}}. In 
short, {{flink-connectors-kinesis}} won't compile in my Intellij until I added 
those dependencies to {{flink-connectors/pom.xml}}.

My {{flink-connectors/pom.xml}} file ends up like this all the time:

{code:java}
C02SD32LG8WP:flink-connectors Bowen$ git diff
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index bc3f82f..2b001f5 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -71,6 +71,16 @@ under the License.
jsr305
provided

+   
+   com.amazonaws
+   amazon-kinesis-producer
+   0.12.5
+   
+   
+   com.amazonaws
+   amazon-kinesis-client
+   1.8.1
+   
+   
+   org.apache.flink
+   flink-streaming-java_2.11
+   1.4-SNAPSHOT
+   



{code}

FYI, building flink-connectors-kinesis from command line always works. 

[~tzulitai] Do you use Intellij? If so, how do you properly set up the 
flink-connectors-kinesis project in Intellij to be able to retrieve 
dependencies?




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-09-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156546#comment-16156546
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-7367:


Merged to {{master}} via 59eab45458b3b1637ccbc5dafd326cc84ffb9655.
Thanks a lot for your work [~phoenixjiangnan]!

> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-09-06 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-7367.
--
   Resolution: Fixed
Fix Version/s: (was: 1.3.3)

Since this includes deprecation, I will not merge it for {{1.3.2}}.

> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7440) Add eager serializable checks on provided de-/serialization schemas for Kinesis consumer / producer

2017-09-06 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-7440.
--
   Resolution: Fixed
Fix Version/s: (was: 1.3.3)

Merged to master via 98737f9a875f1899cb14b3dcef1bd2ac1c6530ba.

> Add eager serializable checks on provided de-/serialization schemas for 
> Kinesis consumer / producer
> ---
>
> Key: FLINK-7440
> URL: https://issues.apache.org/jira/browse/FLINK-7440
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.4.0
>
>
> For better user experience, we should add eager serializable checks on the 
> provided {{KinesisDeserializationSchema}} / {{KinesisSerializationSchema}}, 
> with better error messages pointing out exactly that the serialization schema 
> isn't serializable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7407) Assumption of partition id strict contiguity is too naive in Kafka consumer's AbstractPartitionDiscoverer

2017-09-06 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-7407.
--
Resolution: Fixed

> Assumption of partition id strict contiguity is too naive in Kafka consumer's 
> AbstractPartitionDiscoverer
> -
>
> Key: FLINK-7407
> URL: https://issues.apache.org/jira/browse/FLINK-7407
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0
>
>
> In the Kafka Consumer's {{AbstractPartitionDiscoverer}}, for partition 
> discovery, already discovered partitions are tracked with the following map:
> {code}
> Map topicsToLargestDiscoveredPartitionId
> {code}
> Simply put, on each discovery attempt's metadata fetch, all partition ids of 
> a given topic that are smaller than the largest seen id will be ignored and 
> not assigned. This approach lies on the assumption that fetched partition ids 
> of a single topic are always strictly contiguous starting from 0.
> This assumption may be too naive, in that partitions which were temporarily 
> unavailable at the time of a discovery would be shadowed by available 
> partitions with larger ids, and from then on would be left unassigned.
> We should redesign how the {{AbstractPartitionDiscoverer}} tracks discovered 
> partitions by not relying on the contiguity assumption, and also add test 
> cases for non-contiguous fetched partition ids.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7407) Assumption of partition id strict contiguity is too naive in Kafka consumer's AbstractPartitionDiscoverer

2017-09-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156543#comment-16156543
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-7407:


Merged to {{master}} via 93369e79eb21f17791ddad4e03a18980be6eabfb.

> Assumption of partition id strict contiguity is too naive in Kafka consumer's 
> AbstractPartitionDiscoverer
> -
>
> Key: FLINK-7407
> URL: https://issues.apache.org/jira/browse/FLINK-7407
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0
>
>
> In the Kafka Consumer's {{AbstractPartitionDiscoverer}}, for partition 
> discovery, already discovered partitions are tracked with the following map:
> {code}
> Map topicsToLargestDiscoveredPartitionId
> {code}
> Simply put, on each discovery attempt's metadata fetch, all partition ids of 
> a given topic that are smaller than the largest seen id will be ignored and 
> not assigned. This approach lies on the assumption that fetched partition ids 
> of a single topic are always strictly contiguous starting from 0.
> This assumption may be too naive, in that partitions which were temporarily 
> unavailable at the time of a discovery would be shadowed by available 
> partitions with larger ids, and from then on would be left unassigned.
> We should redesign how the {{AbstractPartitionDiscoverer}} tracks discovered 
> partitions by not relying on the contiguity assumption, and also add test 
> cases for non-contiguous fetched partition ids.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4473: [FLINK-7367][kinesis connector] Parameterize more ...

2017-09-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4473


---


[jira] [Commented] (FLINK-7440) Add eager serializable checks on provided de-/serialization schemas for Kinesis consumer / producer

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156540#comment-16156540
 ] 

ASF GitHub Bot commented on FLINK-7440:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4537


> Add eager serializable checks on provided de-/serialization schemas for 
> Kinesis consumer / producer
> ---
>
> Key: FLINK-7440
> URL: https://issues.apache.org/jira/browse/FLINK-7440
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.4.0, 1.3.3
>
>
> For better user experience, we should add eager serializable checks on the 
> provided {{KinesisDeserializationSchema}} / {{KinesisSerializationSchema}}, 
> with better error messages pointing out exactly that the serialization schema 
> isn't serializable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4537: [FLINK-7440] [kinesis] Add various eager serializa...

2017-09-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4537


---


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156541#comment-16156541
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4473


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

2017-09-06 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7508:

Fix Version/s: (was: 1.3.3)

> switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode 
> rather than Per_Request mode
> 
>
> Key: FLINK-7508
> URL: https://issues.apache.org/jira/browse/FLINK-7508
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.4.0
>
>
> KinesisProducerLibrary (KPL) 0.10.x had been using a 
> One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which 
> is very expensive.
> 0.12.4 introduced a new [ThreadingMode - 
> Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
>  which will use a thread pool. This hugely improves KPL's performance and 
> reduces consumed resources. By default, KPL still uses per-request mode. We 
> should explicitly switch FlinkKinesisProducer's KPL threading mode to 
> 'Pooled'.
> This work depends on FLINK-7366 and FLINK-7508
> Benchmarking I did:
> * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR 
> cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink 
> job generates about 21million UserRecords, which means that we generated a 
> test load of 21million UserRecords at the first minute of each hour.
> * Criteria: Test KPL throughput per minute. Since the default RecordTTL for 
> KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL 
> within a minute, or we will see UserRecord expiration errors.
> * One-New-Thread-Per-Request model: max throughput is about 2million 
> UserRecords per min; it doesn't go beyond that because CPU utilization goes 
> to 100%, everything stopped working and that Flink job crashed.
> * Thread-Pool model: it sends out 21million UserRecords within 30 sec without 
> any UserRecord expiration errors. The average peak CPU utilization is about 
> 20% - 30%. So 21million UserRecords/min is not the max throughput of 
> thread-pool model. We didn't go any further because 1) this throughput is 
> already a couple times more than what we really need, and 2) we don't have a 
> quick way of increasing the test load
> Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. 
> [~tzulitai] What do you think



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

2017-09-06 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7508:

Description: 
KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request 
model for all requests sent to AWS Kinesis, which is very expensive.

0.12.4 introduced a new [ThreadingMode - 
Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
 which will use a thread pool. This hugely improves KPL's performance and 
reduces consumed resources. By default, KPL still uses per-request mode. We 
should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'.

This work depends on FLINK-7366 and FLINK-7508

Benchmarking I did:

* Environment: Running a Flink hourly-sliding windowing job on 18-node EMR 
cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink job 
generates about 21million UserRecords, which means that we generated a test 
load of 21million UserRecords at the first minute of each hour.
* Criteria: Test KPL throughput per minute. Since the default RecordTTL for KPL 
is 30 sec, we can be sure that either all UserRecords are sent by KPL within a 
minute, or we will see UserRecord expiration errors.
* One-New-Thread-Per-Request model: max throughput is about 2million 
UserRecords per min; it doesn't go beyond that because CPU utilization goes to 
100%, everything stopped working and that Flink job crashed.
* Thread-Pool model: it sends out 21million UserRecords within 30 sec without 
any UserRecord expiration errors. The average peak CPU utilization is about 20% 
- 30%. So 21million UserRecords/min is not the max throughput of thread-pool 
model. We didn't go any further because 1) this throughput is already a couple 
times more than what we really need, and 2) we don't have a quick way of 
increasing the test load

Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. [~tzulitai] 
What do you think





  was:
KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request 
model for all requests sent to AWS Kinesis, which is very expensive.

0.12.4 introduced a new [ThreadingMode - 
Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
 which will use a thread pool. This hugely improves KPL's performance and 
reduces consumed resources. By default, KPL still uses per-request mode. We 
should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'.

This work depends on FLINK-7366 and FLINK-7508

Benchmarking I did:

* Environment: Running a Flink hourly-sliding windowing job on 18-node EMR 
cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink job 
generates about 21million UserRecords, which means that we generated a test 
load of 21million UserRecords at the first minute of each hour.
* Criteria: 
* One-New-Thread-Per-Request model: max throughput is about 2million 
UserRecords per min; it doesn't go beyond that because CPU utilization goes to 
100%, everything stopped working and that Flink job crashed.
* Thread-Pool model: it sends out 21million UserRecords within one minute. The 
CPU utilization is about 






> switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode 
> rather than Per_Request mode
> 
>
> Key: FLINK-7508
> URL: https://issues.apache.org/jira/browse/FLINK-7508
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> KinesisProducerLibrary (KPL) 0.10.x had been using a 
> One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which 
> is very expensive.
> 0.12.4 introduced a new [ThreadingMode - 
> Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
>  which will use a thread pool. This hugely improves KPL's performance and 
> reduces consumed resources. By default, KPL still uses per-request mode. We 
> should explicitly switch FlinkKinesisProducer's KPL threading mode to 
> 'Pooled'.
> This work depends on FLINK-7366 and FLINK-7508
> Benchmarking I did:
> * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR 
> cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink 
> job generates about 21million UserRecords, which means that we generate

[jira] [Updated] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

2017-09-06 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7508:

Description: 
KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request 
model for all requests sent to AWS Kinesis, which is very expensive.

0.12.4 introduced a new [ThreadingMode - 
Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
 which will use a thread pool. This hugely improves KPL's performance and 
reduces consumed resources. By default, KPL still uses per-request mode. We 
should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'.

This work depends on FLINK-7366 and FLINK-7508

Benchmarking I did:

* Environment: Running a Flink hourly-sliding windowing job on 18-node EMR 
cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink job 
generates about 21million UserRecords, which means that we generated a test 
load of 21million UserRecords at the first minute of each hour.
* Criteria: 
* One-New-Thread-Per-Request model: max throughput is about 2million 
UserRecords per min; it doesn't go beyond that because CPU utilization goes to 
100%, everything stopped working and that Flink job crashed.
* Thread-Pool model: it sends out 21million UserRecords within one minute. The 
CPU utilization is about 





  was:
KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request 
model for all requests sent to AWS Kinesis, which is very expensive.

0.12.4 introduced a new [ThreadingMode - 
Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
 which will use a thread pool. This hugely improves KPL's performance and 
reduces consumed resources. By default, KPL still uses per-request mode. We 
should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'.

This work depends on FLINK-7366 and FLINK-7508




> switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode 
> rather than Per_Request mode
> 
>
> Key: FLINK-7508
> URL: https://issues.apache.org/jira/browse/FLINK-7508
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> KinesisProducerLibrary (KPL) 0.10.x had been using a 
> One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which 
> is very expensive.
> 0.12.4 introduced a new [ThreadingMode - 
> Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
>  which will use a thread pool. This hugely improves KPL's performance and 
> reduces consumed resources. By default, KPL still uses per-request mode. We 
> should explicitly switch FlinkKinesisProducer's KPL threading mode to 
> 'Pooled'.
> This work depends on FLINK-7366 and FLINK-7508
> Benchmarking I did:
> * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR 
> cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink 
> job generates about 21million UserRecords, which means that we generated a 
> test load of 21million UserRecords at the first minute of each hour.
> * Criteria: 
> * One-New-Thread-Per-Request model: max throughput is about 2million 
> UserRecords per min; it doesn't go beyond that because CPU utilization goes 
> to 100%, everything stopped working and that Flink job crashed.
> * Thread-Pool model: it sends out 21million UserRecords within one minute. 
> The CPU utilization is about 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7407) Assumption of partition id strict contiguity is too naive in Kafka consumer's AbstractPartitionDiscoverer

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156466#comment-16156466
 ] 

ASF GitHub Bot commented on FLINK-7407:
---

Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/4526


> Assumption of partition id strict contiguity is too naive in Kafka consumer's 
> AbstractPartitionDiscoverer
> -
>
> Key: FLINK-7407
> URL: https://issues.apache.org/jira/browse/FLINK-7407
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0
>
>
> In the Kafka Consumer's {{AbstractPartitionDiscoverer}}, for partition 
> discovery, already discovered partitions are tracked with the following map:
> {code}
> Map topicsToLargestDiscoveredPartitionId
> {code}
> Simply put, on each discovery attempt's metadata fetch, all partition ids of 
> a given topic that are smaller than the largest seen id will be ignored and 
> not assigned. This approach lies on the assumption that fetched partition ids 
> of a single topic are always strictly contiguous starting from 0.
> This assumption may be too naive, in that partitions which were temporarily 
> unavailable at the time of a discovery would be shadowed by available 
> partitions with larger ids, and from then on would be left unassigned.
> We should redesign how the {{AbstractPartitionDiscoverer}} tracks discovered 
> partitions by not relying on the contiguity assumption, and also add test 
> cases for non-contiguous fetched partition ids.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156470#comment-16156470
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4473
  
No problem! And really sorry for the wait.
I'm waiting for a Travis run before merging: 
https://travis-ci.org/tzulitai/flink/builds/272758087?utm_source=github_status&utm_medium=notification


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4526: [FLINK-7407] [kafka] Adapt AbstractPartitionDiscov...

2017-09-06 Thread tzulitai
Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/4526


---


[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...

2017-09-06 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4473
  
No problem! And really sorry for the wait.
I'm waiting for a Travis run before merging: 
https://travis-ci.org/tzulitai/flink/builds/272758087?utm_source=github_status&utm_medium=notification


---


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156468#comment-16156468
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4473
  
@tzulitai Thank you, Gordon!


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...

2017-09-06 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4473
  
@tzulitai Thank you, Gordon!


---


[GitHub] flink issue #4526: [FLINK-7407] [kafka] Adapt AbstractPartitionDiscoverer to...

2017-09-06 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4526
  
@aljoscha not yet, this is still mixed in one of my unmerged bathces. 
Merging this now ..


---


[jira] [Commented] (FLINK-7407) Assumption of partition id strict contiguity is too naive in Kafka consumer's AbstractPartitionDiscoverer

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156465#comment-16156465
 ] 

ASF GitHub Bot commented on FLINK-7407:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4526
  
@aljoscha not yet, this is still mixed in one of my unmerged bathces. 
Merging this now ..


> Assumption of partition id strict contiguity is too naive in Kafka consumer's 
> AbstractPartitionDiscoverer
> -
>
> Key: FLINK-7407
> URL: https://issues.apache.org/jira/browse/FLINK-7407
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0
>
>
> In the Kafka Consumer's {{AbstractPartitionDiscoverer}}, for partition 
> discovery, already discovered partitions are tracked with the following map:
> {code}
> Map topicsToLargestDiscoveredPartitionId
> {code}
> Simply put, on each discovery attempt's metadata fetch, all partition ids of 
> a given topic that are smaller than the largest seen id will be ignored and 
> not assigned. This approach lies on the assumption that fetched partition ids 
> of a single topic are always strictly contiguous starting from 0.
> This assumption may be too naive, in that partitions which were temporarily 
> unavailable at the time of a discovery would be shadowed by available 
> partitions with larger ids, and from then on would be left unassigned.
> We should redesign how the {{AbstractPartitionDiscoverer}} tracks discovered 
> partitions by not relying on the contiguity assumption, and also add test 
> cases for non-contiguous fetched partition ids.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156439#comment-16156439
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4473
  
Hi @bowenli86, sorry about this. I had the commit ready to merge but was 
waiting for another test PR to be merged first. Merging this now!


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4565: [FLINK-7429] [kinesis] Add migration test coverage...

2017-09-06 Thread tzulitai
Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/4565


---


[jira] [Commented] (FLINK-7429) Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156440#comment-16156440
 ] 

ASF GitHub Bot commented on FLINK-7429:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4565
  
@aljoscha thanks! Closing ..


> Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer
> ---
>
> Key: FLINK-7429
> URL: https://issues.apache.org/jira/browse/FLINK-7429
> Project: Flink
>  Issue Type: Test
>  Components: Kinesis Connector, Tests
>Affects Versions: 1.2.1, 1.4.0, 1.3.2
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, the `FlinkKinesisConsumerMigrationTest` only tests restore from 
> Flink 1.1.
> We should extend that to also verify restoring from 1.2 and 1.3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4565: [FLINK-7429] [kinesis] Add migration test coverage for Fl...

2017-09-06 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4565
  
@aljoscha thanks! Closing ..


---


[jira] [Commented] (FLINK-7429) Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156441#comment-16156441
 ] 

ASF GitHub Bot commented on FLINK-7429:
---

Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/4565


> Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer
> ---
>
> Key: FLINK-7429
> URL: https://issues.apache.org/jira/browse/FLINK-7429
> Project: Flink
>  Issue Type: Test
>  Components: Kinesis Connector, Tests
>Affects Versions: 1.2.1, 1.4.0, 1.3.2
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, the `FlinkKinesisConsumerMigrationTest` only tests restore from 
> Flink 1.1.
> We should extend that to also verify restoring from 1.2 and 1.3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...

2017-09-06 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4473
  
Hi @bowenli86, sorry about this. I had the commit ready to merge but was 
waiting for another test PR to be merged first. Merging this now!


---


[jira] [Updated] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

2017-09-06 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7508:

Description: 
KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request 
model for all requests sent to AWS Kinesis, which is very expensive.

0.12.4 introduced a new [ThreadingMode - 
Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
 which will use a thread pool. This hugely improves KPL's performance and 
reduces consumed resources. By default, KPL still uses per-request mode. We 
should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'.

This work depends on FLINK-7366 and FLINK-7508



  was:
KinesisProducerLibrary (KPL) 0.10.x had been using a One-New-Thread-Per-Request 
model for all requests sent to AWS Kinesis, which is very expensive.

0.12.4 introduced a new [ThreadingMode - 
Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
 which will use a thread pool. This hugely improves KPL's performance and 
reduces consumed resources. By default, KPL still uses per-request mode. We 
should explicitly switch FlinkKinesisProducer's KPL threading mode to 'Pooled'.

This work depends on FLINK-7366 which upgrades KPL from 0.10 to 0.12.5.




> switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode 
> rather than Per_Request mode
> 
>
> Key: FLINK-7508
> URL: https://issues.apache.org/jira/browse/FLINK-7508
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> KinesisProducerLibrary (KPL) 0.10.x had been using a 
> One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which 
> is very expensive.
> 0.12.4 introduced a new [ThreadingMode - 
> Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
>  which will use a thread pool. This hugely improves KPL's performance and 
> reduces consumed resources. By default, KPL still uses per-request mode. We 
> should explicitly switch FlinkKinesisProducer's KPL threading mode to 
> 'Pooled'.
> This work depends on FLINK-7366 and FLINK-7508



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156424#comment-16156424
 ] 

ASF GitHub Bot commented on FLINK-7367:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4473
  
@tzulitai or other Flink committers, can you please merge this so I can 
submit more PRs depend on this? Thanks!


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---
>
> Key: FLINK-7367
> URL: https://issues.apache.org/jira/browse/FLINK-7367
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...

2017-09-06 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4473
  
@tzulitai or other Flink committers, can you please merge this so I can 
submit more PRs depend on this? Thanks!


---


[jira] [Updated] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

2017-09-06 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7508:

Priority: Critical  (was: Major)

> switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode 
> rather than Per_Request mode
> 
>
> Key: FLINK-7508
> URL: https://issues.apache.org/jira/browse/FLINK-7508
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> KinesisProducerLibrary (KPL) 0.10.x had been using a 
> One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which 
> is very expensive.
> 0.12.4 introduced a new [ThreadingMode - 
> Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
>  which will use a thread pool. This hugely improves KPL's performance and 
> reduces consumed resources. By default, KPL still uses per-request mode. We 
> should explicitly switch FlinkKinesisProducer's KPL threading mode to 
> 'Pooled'.
> This work depends on FLINK-7366 which upgrades KPL from 0.10 to 0.12.5.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7590) Flink failed to flush and close the file system output stream for checkpointing because of s3 read timeout

2017-09-06 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156416#comment-16156416
 ] 

Bowen Li commented on FLINK-7590:
-

[~aljoscha] This also seems to be a bug in HadoopFS's S3 implementation. If we 
can't do anything about, I'd suggest removing this as a blocker of 1.4.0

> Flink failed to flush and close the file system output stream for 
> checkpointing because of s3 read timeout
> --
>
> Key: FLINK-7590
> URL: https://issues.apache.org/jira/browse/FLINK-7590
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Flink job failed once over the weekend because of the following issue. It 
> picked itself up afterwards and has been running well. But the issue might 
> worth taking a look at.
> {code:java}
> 2017-09-03 13:18:38,998 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- reduce 
> (14/18) (c97256badc87e995d456e7a13cec5de9) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 163 for operator reduce (14/18).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 163 for 
> operator reduce (14/18).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain the 
> stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.io.IOException: Could not flush and close the file system output stream 
> to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain 
> the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 7 more
>   Caused by: java.io.IOException: Could not flush and close the file 
> system output stream to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa 
> in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
>   at 
> org.apache.flink.run

[jira] [Commented] (FLINK-7589) org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 159764230; received: 64638536)

2017-09-06 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156410#comment-16156410
 ] 

Bowen Li commented on FLINK-7589:
-

After some research online, the root cause seems to be that AWSS3client 
connection got garbage collected when client is still reading data. 

Reference: 
https://stackoverflow.com/questions/9952815/s3-java-client-fails-a-lot-with-premature-end-of-content-length-delimited-messa

I doubt if there's actually anything we can do about it. Flink is calling 
HadoopFS, and HadoopFS calls s3. As Flink developer, we can't touch the 
HadoopFS code. [~aljoscha]  what do you think?

> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536)
> ---
>
> Key: FLINK-7589
> URL: https://issues.apache.org/jira/browse/FLINK-7589
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> When I tried to resume a Flink job from a savepoint with different 
> parallelism, I ran into this error. And the resume failed.
> {code:java}
> 2017-09-05 21:53:57,317 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- returnsivs -> 
> Sink: xxx (7/12) (0da16ec908fc7b9b16a5c2cf1aa92947) switched from RUNNING to 
> FAILED.
> org.apache.http.ConnectionClosedException: Premature end of Content-Length 
> delimited message body (expected: 159764230; received: 64638536
>   at 
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180)
>   at 
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at 
> com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
>   at java.io.FilterInputStream.read(FilterInputStream.java:133)
>   at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at 
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:72)
>   at 
> org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:61)
>   at 
> org.apache.flink.runtime.util.NonClosingStreamDecorator.read(NonClosingStreamDecorator.java:47)
>   at java.io.DataInputStream.readFully(DataInputStream.java:195)
>   at java.io.DataInputStream.readLong(DataInputStream.java:416)
>   at 
> org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:68)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimer$TimerSerializer.deserialize(InternalTimer.java:156)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.restoreTimersForKeyGroup(HeapInternalTimerService.java:345)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:141)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:496)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:104)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7521) Remove the 10MB limit from the current REST implementation.

2017-09-06 Thread Fang Yong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fang Yong reassigned FLINK-7521:


Assignee: Fang Yong

> Remove the 10MB limit from the current REST implementation.
> ---
>
> Key: FLINK-7521
> URL: https://issues.apache.org/jira/browse/FLINK-7521
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.4.0
>
>
> In the current {{AbstractRestServer}} we impose an upper bound of 10MB in the 
> states we can transfer. This is in the line {{.addLast(new 
> HttpObjectAggregator(1024 * 1024 * 10))}} of the server implementation. 
> This limit is restrictive for some of the usecases planned to use this 
> implementation (e.g. the job submission client which has to send full jars, 
> or the queryable state client which may have to receive states bigger than 
> that).
> This issue proposes the elimination of this limit.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7594) Add a SQL CLI client

2017-09-06 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156356#comment-16156356
 ] 

Kurt Young commented on FLINK-7594:
---

+1 to this, it's a string improvement for usability
(y)

> Add a SQL CLI client
> 
>
> Key: FLINK-7594
> URL: https://issues.apache.org/jira/browse/FLINK-7594
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment a user can only specify queries within a Java/Scala program 
> which is nice for integrating table programs or parts of it with DataSet or 
> DataStream API. With more connectors coming up, it is time to also provide a 
> programming-free SQL client. The SQL client should consist of a CLI interface 
> and maybe also a REST API. The concrete design is still up for discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7594) Add a SQL CLI client

2017-09-06 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156356#comment-16156356
 ] 

Kurt Young edited comment on FLINK-7594 at 9/7/17 2:46 AM:
---

+1 to this, it's a strong improvement for usability
(y)


was (Author: ykt836):
+1 to this, it's a string improvement for usability
(y)

> Add a SQL CLI client
> 
>
> Key: FLINK-7594
> URL: https://issues.apache.org/jira/browse/FLINK-7594
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment a user can only specify queries within a Java/Scala program 
> which is nice for integrating table programs or parts of it with DataSet or 
> DataStream API. With more connectors coming up, it is time to also provide a 
> programming-free SQL client. The SQL client should consist of a CLI interface 
> and maybe also a REST API. The concrete design is still up for discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7521) Remove the 10MB limit from the current REST implementation.

2017-09-06 Thread Fang Yong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156352#comment-16156352
 ] 

Fang Yong commented on FLINK-7521:
--

It might be better to add a config option to set this value instead of remove 
it, what do you think? Thanks [~kkl0u]

> Remove the 10MB limit from the current REST implementation.
> ---
>
> Key: FLINK-7521
> URL: https://issues.apache.org/jira/browse/FLINK-7521
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> In the current {{AbstractRestServer}} we impose an upper bound of 10MB in the 
> states we can transfer. This is in the line {{.addLast(new 
> HttpObjectAggregator(1024 * 1024 * 10))}} of the server implementation. 
> This limit is restrictive for some of the usecases planned to use this 
> implementation (e.g. the job submission client which has to send full jars, 
> or the queryable state client which may have to receive states bigger than 
> that).
> This issue proposes the elimination of this limit.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7590) Flink failed to flush and close the file system output stream for checkpointing because of s3 read timeout

2017-09-06 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156347#comment-16156347
 ] 

Bowen Li commented on FLINK-7590:
-

According to [1], this seems can be fixed by aws-sdk version. But I can't find 
where Flink brings in aws-sdk except flink-connectors-kinesis

[1] https://github.com/aws/aws-sdk-java/issues/1174

> Flink failed to flush and close the file system output stream for 
> checkpointing because of s3 read timeout
> --
>
> Key: FLINK-7590
> URL: https://issues.apache.org/jira/browse/FLINK-7590
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Flink job failed once over the weekend because of the following issue. It 
> picked itself up afterwards and has been running well. But the issue might 
> worth taking a look at.
> {code:java}
> 2017-09-03 13:18:38,998 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- reduce 
> (14/18) (c97256badc87e995d456e7a13cec5de9) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 163 for operator reduce (14/18).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 163 for 
> operator reduce (14/18).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain the 
> stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.io.IOException: Could not flush and close the file system output stream 
> to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa in order to obtain 
> the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 7 more
>   Caused by: java.io.IOException: Could not flush and close the file 
> system output stream to s3://xxx/chk-163/dcb9e1df-78e0-444a-9646-7701b25c1aaa 
> in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)

[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

2017-09-06 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7596:
--
Description: 
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:

`
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
`

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead

  was:
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:
```
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
```
this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead


> Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) 
> --
>
> Key: FLINK-7596
> URL: https://issues.apache.org/jira/browse/FLINK-7596
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> If two inputs with Any(GenericRelDataType), when they comes to Set 
> Operation(Union, minus...), it will cause a {{TableException}} with info is 
> "Type is not supported: ANY"
> Here is the test case:
> `
> @Test
>   def testUnion(): Unit = {
> val list = List((1, new NODE), (2, new NODE))
> val list2 = List((3, new NODE), (4, new NODE))
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val s1 = tEnv.fromDataStream(env.fromCollection(list))
> val s2 = tEnv.fromDataStream(env.fromCollection(list2))
> val result = s1.unionAll(s2).toAppendStream[Row]
> result.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
>   class NODE {
>   val x = new util.HashMap[String, String]()
> }
> `
> this bug happens because flink did't handle createSqlType(ANY) and Calcite 
> does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, 
> so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

2017-09-06 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7596:
--
Description: 
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:
```
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
```
this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead

  was:
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:
`
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
`
this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead


> Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) 
> --
>
> Key: FLINK-7596
> URL: https://issues.apache.org/jira/browse/FLINK-7596
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> If two inputs with Any(GenericRelDataType), when they comes to Set 
> Operation(Union, minus...), it will cause a {{TableException}} with info is 
> "Type is not supported: ANY"
> Here is the test case:
> ```
> @Test
>   def testUnion(): Unit = {
> val list = List((1, new NODE), (2, new NODE))
> val list2 = List((3, new NODE), (4, new NODE))
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val s1 = tEnv.fromDataStream(env.fromCollection(list))
> val s2 = tEnv.fromDataStream(env.fromCollection(list2))
> val result = s1.unionAll(s2).toAppendStream[Row]
> result.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
>   class NODE {
>   val x = new util.HashMap[String, String]()
> }
> ```
> this bug happens because flink did't handle createSqlType(ANY) and Calcite 
> does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, 
> so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

2017-09-06 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7596:
--
Description: 
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:
`
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
`
this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead

  was:
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:

@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead


> Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) 
> --
>
> Key: FLINK-7596
> URL: https://issues.apache.org/jira/browse/FLINK-7596
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> If two inputs with Any(GenericRelDataType), when they comes to Set 
> Operation(Union, minus...), it will cause a {{TableException}} with info is 
> "Type is not supported: ANY"
> Here is the test case:
> `
> @Test
>   def testUnion(): Unit = {
> val list = List((1, new NODE), (2, new NODE))
> val list2 = List((3, new NODE), (4, new NODE))
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val s1 = tEnv.fromDataStream(env.fromCollection(list))
> val s2 = tEnv.fromDataStream(env.fromCollection(list2))
> val result = s1.unionAll(s2).toAppendStream[Row]
> result.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
>   class NODE {
>   val x = new util.HashMap[String, String]()
> }
> `
> this bug happens because flink did't handle createSqlType(ANY) and Calcite 
> does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, 
> so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

2017-09-06 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7596:
--
Description: 
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:

@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead

  was:
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:
{{
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
}}

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead


> Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) 
> --
>
> Key: FLINK-7596
> URL: https://issues.apache.org/jira/browse/FLINK-7596
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> If two inputs with Any(GenericRelDataType), when they comes to Set 
> Operation(Union, minus...), it will cause a {{TableException}} with info is 
> "Type is not supported: ANY"
> Here is the test case:
> @Test
>   def testUnion(): Unit = {
> val list = List((1, new NODE), (2, new NODE))
> val list2 = List((3, new NODE), (4, new NODE))
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val s1 = tEnv.fromDataStream(env.fromCollection(list))
> val s2 = tEnv.fromDataStream(env.fromCollection(list2))
> val result = s1.unionAll(s2).toAppendStream[Row]
> result.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
>   class NODE {
>   val x = new util.HashMap[String, String]()
> }
> this bug happens because flink did't handle createSqlType(ANY) and Calcite 
> does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, 
> so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

2017-09-06 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-7596:
--
Description: 
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:
{{
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
}}

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead

  was:
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a `TableException` with info is "Type 
is not supported: ANY"
Here is the test case:
`
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
`

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between `ANY` and `ANY(GenericRelDataType)`, so the 
`createSqlType(ANY)` of Calcite will return a BasicSqlType instead


> Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) 
> --
>
> Key: FLINK-7596
> URL: https://issues.apache.org/jira/browse/FLINK-7596
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> If two inputs with Any(GenericRelDataType), when they comes to Set 
> Operation(Union, minus...), it will cause a {{TableException}} with info is 
> "Type is not supported: ANY"
> Here is the test case:
> {{
> @Test
>   def testUnion(): Unit = {
> val list = List((1, new NODE), (2, new NODE))
> val list2 = List((3, new NODE), (4, new NODE))
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val s1 = tEnv.fromDataStream(env.fromCollection(list))
> val s2 = tEnv.fromDataStream(env.fromCollection(list2))
> val result = s1.unionAll(s2).toAppendStream[Row]
> result.addSink(new StreamITCase.StringSink[Row])
> env.execute()
>   }
>   class NODE {
>   val x = new util.HashMap[String, String]()
> }
> }}
> this bug happens because flink did't handle createSqlType(ANY) and Calcite 
> does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, 
> so the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7596) Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType)

2017-09-06 Thread Ruidong Li (JIRA)
Ruidong Li created FLINK-7596:
-

 Summary: Fix bug during Set Operation (Union, Minus ... ) with 
Any(GenericRelDataType) 
 Key: FLINK-7596
 URL: https://issues.apache.org/jira/browse/FLINK-7596
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Ruidong Li
Assignee: Ruidong Li


If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a `TableException` with info is "Type 
is not supported: ANY"
Here is the test case:
`
@Test
  def testUnion(): Unit = {
val list = List((1, new NODE), (2, new NODE))
val list2 = List((3, new NODE), (4, new NODE))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val s1 = tEnv.fromDataStream(env.fromCollection(list))
val s2 = tEnv.fromDataStream(env.fromCollection(list2))
val result = s1.unionAll(s2).toAppendStream[Row]
result.addSink(new StreamITCase.StringSink[Row])
env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
`

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between `ANY` and `ANY(GenericRelDataType)`, so the 
`createSqlType(ANY)` of Calcite will return a BasicSqlType instead



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


issues@flink.apache.org

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156251#comment-16156251
 ] 

ASF GitHub Bot commented on FLINK-7465:
---

GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/4652

[FLINK-7465][table]Add cardinality count for tableAPI and SQL.

## What is the purpose of the change

*In this PR. we want add add CARDINALITY_COUNT for tableAPI and SQL.(Using 
`HyperLogLog` algorithm). 
The implementation of HyperLogLog (HLL) algorithm from this paper: 
http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf 
As we know there are still some improved algorithms, such as: 
HyperLogLog++, HyperBitBit etc.
But `HyperLogLog` is a classic algorithm that has been massively verified, 
so I chose to use the `HyperLogLog` algorithm as the first version of 
cardinality to achieve. And we can improve the algorithm at any time If we need.
*

## Brief change log
  - *Add Java implementation of `HyperLogLog`(base on stream-lib)*
  - *Add MURMURHASH See more: http://murmurhash.googlepages.com/*
  - *Add build-in `CardinalityCountAggFunction`*
  - *Add some test case for the validation*
  - *Add documentation for TableAPI&SQL*

## Verifying this change
This change added tests and can be verified as follows:
  - *Added SQL/TableAPI integration tests for `cardinality_count`*
  - *Added `CardinalityCountAggFunctionTest` test case for verify the AGG 
logic.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-7465-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4652.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4652


commit bc1166ad88538bdcdd6df685c750359aadff3950
Author: 金竹 
Date:   2017-09-05T10:21:10Z

[FLINK-7465][table]Add cardinality count for tableAPI and SQL.




> Add build-in BloomFilterCount on TableAPI&SQL
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~tw

[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...

2017-09-06 Thread sunjincheng121
GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/4652

[FLINK-7465][table]Add cardinality count for tableAPI and SQL.

## What is the purpose of the change

*In this PR. we want add add CARDINALITY_COUNT for tableAPI and SQL.(Using 
`HyperLogLog` algorithm). 
The implementation of HyperLogLog (HLL) algorithm from this paper: 
http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf 
As we know there are still some improved algorithms, such as: 
HyperLogLog++, HyperBitBit etc.
But `HyperLogLog` is a classic algorithm that has been massively verified, 
so I chose to use the `HyperLogLog` algorithm as the first version of 
cardinality to achieve. And we can improve the algorithm at any time If we need.
*

## Brief change log
  - *Add Java implementation of `HyperLogLog`(base on stream-lib)*
  - *Add MURMURHASH See more: http://murmurhash.googlepages.com/*
  - *Add build-in `CardinalityCountAggFunction`*
  - *Add some test case for the validation*
  - *Add documentation for TableAPI&SQL*

## Verifying this change
This change added tests and can be verified as follows:
  - *Added SQL/TableAPI integration tests for `cardinality_count`*
  - *Added `CardinalityCountAggFunctionTest` test case for verify the AGG 
logic.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-7465-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4652.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4652


commit bc1166ad88538bdcdd6df685c750359aadff3950
Author: 金竹 
Date:   2017-09-05T10:21:10Z

[FLINK-7465][table]Add cardinality count for tableAPI and SQL.




---


[jira] [Commented] (FLINK-7594) Add a SQL CLI client

2017-09-06 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156221#comment-16156221
 ] 

Haohui Mai commented on FLINK-7594:
---

We internally has a project (AthenaX) for this requirement and we are in the 
process of open sourcing it. We are happy to contribute it directly to the 
flink repository as well.


> Add a SQL CLI client
> 
>
> Key: FLINK-7594
> URL: https://issues.apache.org/jira/browse/FLINK-7594
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment a user can only specify queries within a Java/Scala program 
> which is nice for integrating table programs or parts of it with DataSet or 
> DataStream API. With more connectors coming up, it is time to also provide a 
> programming-free SQL client. The SQL client should consist of a CLI interface 
> and maybe also a REST API. The concrete design is still up for discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7124) Allow to rescale JobGraph on JobManager

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156061#comment-16156061
 ] 

ASF GitHub Bot commented on FLINK-7124:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4510#discussion_r137395149
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This class contains tests that verify when rescaling a {@link JobGraph},
+ * constructed {@link ExecutionGraph}s are correct.
+ */
+public class ExecutionGraphRescalingTest {
+
+   private static final Logger TEST_LOGGER = 
LoggerFactory.getLogger(ExecutionGraphRescalingTest.class);
+
+   @Test
+   public void testExecutionGraphArbitraryDopConstructionTest() throws 
Exception {
+
+   final Configuration config = new Configuration();
+
+   final JobVertex[] jobVertices = 
createVerticesForSimpleBipartiteJobGraph();
+   final JobGraph jobGraph = new JobGraph(jobVertices);
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 
ExecutionConfig.PARALLELISM_AUTO_MAX.
+   // TODO this limitation should be removed.
+   for (JobVertex jv : jobVertices) {
+   jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+   }
+
+   ExecutionGraph eg = ExecutionGraphBuilder.buildGraph(
+   null,
+   jobGraph,
+   config,
+   TestingUtils.defaultExecutor(),
+   TestingUtils.defaultExecutor(),
+   new 
Scheduler(TestingUtils.defaultExecutionContext()),
+   Thread.currentThread().getContextClassLoader(),
+   new StandaloneCheckpointRecoveryFactory(),
+   AkkaUtils.getDefaultTimeout(),
+   new NoRestartStrategy(),
+   new UnregisteredMetricsGroup(),
+   5,
+   TEST_LOGGER);
+
+   for (JobVertex jv : jobVertices) {
+   assertEquals(5, jv.getParallelism());
+   }
+   verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph(eg, 
jobVertices);
+
+   // --- verify scaling up works correctly ---
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 
ExecutionCon

[GitHub] flink pull request #4510: [FLINK-7124] [flip-6] Add test to verify rescaling...

2017-09-06 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4510#discussion_r137395149
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This class contains tests that verify when rescaling a {@link JobGraph},
+ * constructed {@link ExecutionGraph}s are correct.
+ */
+public class ExecutionGraphRescalingTest {
+
+   private static final Logger TEST_LOGGER = 
LoggerFactory.getLogger(ExecutionGraphRescalingTest.class);
+
+   @Test
+   public void testExecutionGraphArbitraryDopConstructionTest() throws 
Exception {
+
+   final Configuration config = new Configuration();
+
+   final JobVertex[] jobVertices = 
createVerticesForSimpleBipartiteJobGraph();
+   final JobGraph jobGraph = new JobGraph(jobVertices);
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 
ExecutionConfig.PARALLELISM_AUTO_MAX.
+   // TODO this limitation should be removed.
+   for (JobVertex jv : jobVertices) {
+   jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+   }
+
+   ExecutionGraph eg = ExecutionGraphBuilder.buildGraph(
+   null,
+   jobGraph,
+   config,
+   TestingUtils.defaultExecutor(),
+   TestingUtils.defaultExecutor(),
+   new 
Scheduler(TestingUtils.defaultExecutionContext()),
+   Thread.currentThread().getContextClassLoader(),
+   new StandaloneCheckpointRecoveryFactory(),
+   AkkaUtils.getDefaultTimeout(),
+   new NoRestartStrategy(),
+   new UnregisteredMetricsGroup(),
+   5,
+   TEST_LOGGER);
+
+   for (JobVertex jv : jobVertices) {
+   assertEquals(5, jv.getParallelism());
+   }
+   verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph(eg, 
jobVertices);
+
+   // --- verify scaling up works correctly ---
+
+   // TODO rescaling the JobGraph is currently only supported if 
the
+   // TODO configured parallelism is 
ExecutionConfig.PARALLELISM_AUTO_MAX.
+   // TODO this limitation should be removed.
+   for (JobVertex jv : jobVertices) {
+   jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+   }
+
+  

[jira] [Commented] (FLINK-7430) ContinuousFileReaderOperator swallows exceptions

2017-09-06 Thread Peter Ertl (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155874#comment-16155874
 ] 

Peter Ertl commented on FLINK-7430:
---

Thanks for your great help guys! :)

> ContinuousFileReaderOperator swallows exceptions
> 
>
> Key: FLINK-7430
> URL: https://issues.apache.org/jira/browse/FLINK-7430
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.4.0, 1.3.2
> Environment: - macOS 10.12.6
> - Oracle JDK 1.8.0_144
> - Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0
>
>
> The class *ContinuousFileReaderOperator* is swallowing exceptions as the 
> following example demonstrates:
> {code:java}
> package org.apache.flink.streaming.examples;
> import java.io.File;
> import java.io.IOException;
> import org.apache.flink.api.common.io.OutputFormat;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> public class FormatExceptionSwallowed {
>   public static void main(String[] args) throws Exception {
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   File bla = File.createTempFile("foo", "baz");
>   try(PrintWriter w = new PrintWriter(bla)) {
>   w.println("one");
>   w.println("two");
>   w.println("three");
>   }
>   env.readTextFile(bla.getCanonicalPath())
>   .writeUsingOutputFormat(new OutputFormat() {
>   @Override
>   public void configure(final Configuration 
> parameters) {
>   }
>   @Override
>   public void open(final int taskNumber, final 
> int numTasks) throws IOException {
>   }
>   @Override
>   public void writeRecord(final String record) 
> throws IOException {
>   throw new 
> IllegalArgumentException("bla");
>   }
>   @Override
>   public void close() throws IOException {
>   }
>   });
>   env.execute("go");
>   
>   // JOB TERMINATES WITH NO EXCEPTION / ERROR whatsoever ... 
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work

2017-09-06 Thread Peter Ertl (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155873#comment-16155873
 ] 

Peter Ertl commented on FLINK-7567:
---

if the parallelism of the feedback stream MUST be equal to the parallism of the 
input stream why not make it the default?

> DataStream#iterate() on env.fromElements() / env.fromCollection() does not 
> work
> ---
>
> Key: FLINK-7567
> URL: https://issues.apache.org/jira/browse/FLINK-7567
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>Reporter: Peter Ertl
>Assignee: Mikhail Lipkovich
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> // do something silly just do get iteration going ...
> val result = env.fromElements(1, 2, 3).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
> })
> result.print()
> env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream 
> must match the parallelism of the original stream. Parallelism of original 
> stream: 1; parallelism of feedback stream: 8
>   at 
> org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
>   at 
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
>   at 
> org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
>   at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error 
> makes no sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7407) Assumption of partition id strict contiguity is too naive in Kafka consumer's AbstractPartitionDiscoverer

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155858#comment-16155858
 ] 

ASF GitHub Bot commented on FLINK-7407:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4526
  
@tzulitai Did you end up merging this?


> Assumption of partition id strict contiguity is too naive in Kafka consumer's 
> AbstractPartitionDiscoverer
> -
>
> Key: FLINK-7407
> URL: https://issues.apache.org/jira/browse/FLINK-7407
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0
>
>
> In the Kafka Consumer's {{AbstractPartitionDiscoverer}}, for partition 
> discovery, already discovered partitions are tracked with the following map:
> {code}
> Map topicsToLargestDiscoveredPartitionId
> {code}
> Simply put, on each discovery attempt's metadata fetch, all partition ids of 
> a given topic that are smaller than the largest seen id will be ignored and 
> not assigned. This approach lies on the assumption that fetched partition ids 
> of a single topic are always strictly contiguous starting from 0.
> This assumption may be too naive, in that partitions which were temporarily 
> unavailable at the time of a discovery would be shadowed by available 
> partitions with larger ids, and from then on would be left unassigned.
> We should redesign how the {{AbstractPartitionDiscoverer}} tracks discovered 
> partitions by not relying on the contiguity assumption, and also add test 
> cases for non-contiguous fetched partition ids.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4526: [FLINK-7407] [kafka] Adapt AbstractPartitionDiscoverer to...

2017-09-06 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4526
  
@tzulitai Did you end up merging this?


---


[jira] [Commented] (FLINK-7357) HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY HOP window

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155848#comment-16155848
 ] 

ASF GitHub Bot commented on FLINK-7357:
---

Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/4521
  
@twalthr looks like this should resolve the last filter issue. Please 
kindly take a look and see if this looks good to go in :-)


> HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY 
> HOP window
> -
>
> Key: FLINK-7357
> URL: https://issues.apache.org/jira/browse/FLINK-7357
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> The following SQL does not compile:
> {code:title=invalid_having_hop_start_sql}
> SELECT 
>   c AS k, 
>   COUNT(a) AS v, 
>   HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS 
> windowStart, 
>   HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd 
> FROM 
>   T1 
> GROUP BY 
>   HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), 
>   c 
> HAVING 
>   SUM(b) > 1
> {code}
> While individually keeping HAVING clause or HOP_START field compiles and runs 
> without issue.
> more details: 
> https://github.com/apache/flink/compare/master...walterddr:having_does_not_work_with_hop_start_end



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4521: [FLINK-7357] [table] Created extended rules for WindowSta...

2017-09-06 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/4521
  
@twalthr looks like this should resolve the last filter issue. Please 
kindly take a look and see if this looks good to go in :-)


---


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137292784
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobService.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A service to retrieve transient binary large objects (BLOBs).
+ * 
+ * These include per-job BLOBs that are , e.g. a job's JAR files, parts of 
an off-loaded {@link
--- End diff --

I thought that job's JAR files are stored in the permanent blob service?


---


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137291887
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides access to transient BLOB files stored at the {@link 
BlobServer}.
+ *
+ * TODO: currently, this is still cache-based with local copies - make 
this truly transient, i.e. return file streams with no local copy
+ */
+public class TransientBlobCache implements TransientBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TransientBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /**
+* Root directory for local file storage
+*/
+   private final File storageDir;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the local storage 
directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   /**
+* Instantiates a new BLOB cache.
+*
+* @param serverAddress
+*  address of the {@link BlobServer} to use for fetching 
files from
+* @param blobClientConfig
+*  global configuration
+*
+* @throws IOException
+*  thrown if the (local or distributed) file storage 
cannot be created or is not usable
+*/
+   public TransientBlobCache(
+   final InetSocketAddress serverAddress,
+   final Configuration blobClientConfig) throws 
IOException {
--- End diff --

Probably not so easy because we also need it for the `BlobClient` creation.


---


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1619#comment-1619
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137259367
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---
@@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception {
try {
blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
 
-   BlobRecoveryITCase.testBlobServerRecovery(config, 
blobStoreService);
+   BlobServerRecoveryTest.testBlobServerRecovery(config, 
blobStoreService);
+   } finally {
+   if (blobStoreService != null) {
+   blobStoreService.closeAndCleanupAllData();
+   }
+   }
+   }
+
+   /**
+* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed 
corrupted JARs are
+* recognised during the download via a {@link 
org.apache.flink.runtime.blob.BlobServer}.
+*/
+   @Test
+   public void testBlobServerCorruptedFile() throws Exception {
+   org.apache.flink.configuration.Configuration
+   config = new 
org.apache.flink.configuration.Configuration();
+   config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+   config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
--- End diff --

I think "ZooKeeper" is not a valid state backend. What did you want to do 
with that?


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137287126
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * A service to retrieve permanent binary large objects (BLOBs).
+ * 
+ * These include per-job BLOBs that are covered by high-availability (HA) 
mode, e.g. a job's JAR
+ * files, parts of an off-loaded {@link 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor}
+ * or files in the {@link 
org.apache.flink.api.common.cache.DistributedCache}.
+ */
+public interface PermanentBlobService extends Closeable {
+
+   /**
+* Returns the path to a local copy of the file associated with the 
provided job ID and blob
+* key.
+*
+* @param jobId
+*  ID of the job this blob belongs to
+* @param key
+*  BLOB key associated with the requested file
+*
+* @return The path to the file.
+*
+* @throws java.io.FileNotFoundException
+*  if the BLOB does not exist;
+* @throws IOException
+*  if any other error occurs when retrieving the file
+*/
+   File getHAFile(JobID jobId, BlobKey key) throws IOException;
--- End diff --

Not sure whether this is the right name because HA does not depend on the 
`PermanentBlobService` but on the the `BlobStore`. I would suggest to rename it.


---


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137270473
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws 
IOException {
 *
 * @param jobId
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
-* @param requiredBlob
+* @param blobKey
 *  blob key associated with the requested file
+* @param highlyAvailable
+*  whether to the requested file is highly available (HA)
 *
 * @return file referring to the local storage location of the BLOB
 *
 * @throws IOException
 *  Thrown if the file retrieval failed.
 */
-   private File getFileInternal(@Nullable JobID jobId, BlobKey 
requiredBlob) throws IOException {
-   checkArgument(requiredBlob != null, "BLOB key cannot be null.");
+   private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, 
boolean highlyAvailable) throws IOException {
--- End diff --

Maybe we could introduce an enum here as well for the `highlyAvailable` 
boolean argument.


---


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137291176
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides access to transient BLOB files stored at the {@link 
BlobServer}.
+ *
+ * TODO: currently, this is still cache-based with local copies - make 
this truly transient, i.e. return file streams with no local copy
+ */
+public class TransientBlobCache implements TransientBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TransientBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /**
+* Root directory for local file storage
+*/
+   private final File storageDir;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the local storage 
directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   /**
+* Instantiates a new BLOB cache.
+*
+* @param serverAddress
+*  address of the {@link BlobServer} to use for fetching 
files from
+* @param blobClientConfig
+*  global configuration
+*
+* @throws IOException
+*  thrown if the (local or distributed) file storage 
cannot be created or is not usable
+*/
+   public TransientBlobCache(
+   final InetSocketAddress serverAddress,
+   final Configuration blobClientConfig) throws 
IOException {
+
+   this.serverAddress = checkNotNull(serverAddress);
+   this.blobClientConfig = checkNotNull(blobClientConfig);
+   this.readWriteLock = new ReentrantReadWriteLock();
+
+   // configure and create the storage directory
+   String storageDirectory = 
blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
+   this.storageDir = 
BlobUtils.initLocalStorageDirectory(storageDirectory);
+   LOG.info("Created transient BLOB cache storage directory " + 
storageDir);
+
+   // configure the number of fetch retries
+   final int fetchRetries = 
blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
+   if (fetchRetries >= 0) {
+   this.numFetchRetries = fetchRetries;
+   } else {
+   

[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155564#comment-16155564
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137291176
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides access to transient BLOB files stored at the {@link 
BlobServer}.
+ *
+ * TODO: currently, this is still cache-based with local copies - make 
this truly transient, i.e. return file streams with no local copy
+ */
+public class TransientBlobCache implements TransientBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TransientBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /**
+* Root directory for local file storage
+*/
+   private final File storageDir;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the local storage 
directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   /**
+* Instantiates a new BLOB cache.
+*
+* @param serverAddress
+*  address of the {@link BlobServer} to use for fetching 
files from
+* @param blobClientConfig
+*  global configuration
+*
+* @throws IOException
+*  thrown if the (local or distributed) file storage 
cannot be created or is not usable
+*/
+   public TransientBlobCache(
+   final InetSocketAddress serverAddress,
+   final Configuration blobClientConfig) throws 
IOException {
+
+   this.serverAddress = checkNotNull(serverAddress);
+   this.blobClientConfig = checkNotNull(blobClientConfig);
+   this.readWriteLock = new ReentrantReadWriteLock();
+
+   // configure and create the storage directory
+   String storageDirectory = 
blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
+   this.storageDir = 
BlobUtils.initLocalStorageDirectory(storageDirectory);
+   LOG.info("Created transient BLOB cache storage directory " + 
storageDir);
+
+   // configure the number of f

[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1618#comment-1618
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137268133
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -601,7 +564,39 @@ public void deleteInternal(@Nullable JobID jobId, 
BlobKey key) throws IOExceptio
}
 
/**
-* Uploads the JAR files to a {@link BlobServer} at the given address.
+* Reads the response from the input stream and throws in case of errors
+*
+* @param is
+*  stream to read from
+*
+* @return  true if the delete operation was successful at the 
{@link BlobServer};
+*  false otherwise
+*
+* @throws IOException
+*  if the server code throws an exception or if reading 
the response failed
+*/
+   private static boolean receiveAndCheckDeleteResponse(InputStream is) 
throws IOException {
+   int response = is.read();
+   if (response < 0) {
+   throw new EOFException("Premature end of response");
+   }
+   if (response == RETURN_ERROR) {
+   Throwable cause = readExceptionFromStream(is);
+   if (cause == null) {
+   return false;
+   } else {
+   throw new IOException("Server side error: " + 
cause.getMessage(), cause);
--- End diff --

I think we don't have to append the cause message to `IOException's` 
message, because it is included in the `cause`.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155560#comment-16155560
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137287126
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * A service to retrieve permanent binary large objects (BLOBs).
+ * 
+ * These include per-job BLOBs that are covered by high-availability (HA) 
mode, e.g. a job's JAR
+ * files, parts of an off-loaded {@link 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor}
+ * or files in the {@link 
org.apache.flink.api.common.cache.DistributedCache}.
+ */
+public interface PermanentBlobService extends Closeable {
+
+   /**
+* Returns the path to a local copy of the file associated with the 
provided job ID and blob
+* key.
+*
+* @param jobId
+*  ID of the job this blob belongs to
+* @param key
+*  BLOB key associated with the requested file
+*
+* @return The path to the file.
+*
+* @throws java.io.FileNotFoundException
+*  if the BLOB does not exist;
+* @throws IOException
+*  if any other error occurs when retrieving the file
+*/
+   File getHAFile(JobID jobId, BlobKey key) throws IOException;
--- End diff --

Not sure whether this is the right name because HA does not depend on the 
`PermanentBlobService` but on the the `BlobStore`. I would suggest to rename it.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137295883
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
 ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a cache for permanent BLOB files including a per-job 
ref-counting and a staged cleanup.
+ * 
+ * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache 
will first attempt to
+ * serve the file from its local cache. Only if the local cache does not 
contain the desired BLOB,
+ * it will try to download it from a distributed HA file system (if 
available) or the BLOB server.
+ * 
+ * If files for a job are not needed any more, they will enter a staged, 
i.e. deferred, cleanup.
+ * Files may thus still be be accessible upon recovery and do not need to 
be re-downloaded.
+ */
+public class PermanentBlobCache extends TimerTask implements 
PermanentBlobService {
--- End diff --

Just a wild thought: I noticed that the `TransientBlobCache` and the 
`PermanentBlobCache` have a lot of code in common. In order to reduce code 
duplication couldn't we create a common base class or let `PermanentBlobCache` 
extend `TransientBlobCache` adding the ref counting?


---


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155562#comment-16155562
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137270473
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws 
IOException {
 *
 * @param jobId
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
-* @param requiredBlob
+* @param blobKey
 *  blob key associated with the requested file
+* @param highlyAvailable
+*  whether to the requested file is highly available (HA)
 *
 * @return file referring to the local storage location of the BLOB
 *
 * @throws IOException
 *  Thrown if the file retrieval failed.
 */
-   private File getFileInternal(@Nullable JobID jobId, BlobKey 
requiredBlob) throws IOException {
-   checkArgument(requiredBlob != null, "BLOB key cannot be null.");
+   private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, 
boolean highlyAvailable) throws IOException {
--- End diff --

Maybe we could introduce an enum here as well for the `highlyAvailable` 
boolean argument.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137302842
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -174,7 +176,6 @@ public ServerSocket createSocket(int port) throws 
IOException {
// start the server thread
setName("BLOB Server listener at " + getPort());
setDaemon(true);
-   start();
--- End diff --

Why did you pull `start` out of the constructor? Wouldn't one always want 
to start the `BlobServer` when creating it?


---


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155570#comment-16155570
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137289123
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
 ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a cache for permanent BLOB files including a per-job 
ref-counting and a staged cleanup.
+ * 
+ * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache 
will first attempt to
+ * serve the file from its local cache. Only if the local cache does not 
contain the desired BLOB,
+ * it will try to download it from a distributed HA file system (if 
available) or the BLOB server.
+ * 
+ * If files for a job are not needed any more, they will enter a staged, 
i.e. deferred, cleanup.
+ * Files may thus still be be accessible upon recovery and do not need to 
be re-downloaded.
+ */
+public class PermanentBlobCache extends TimerTask implements 
PermanentBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(PermanentBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /** Root directory for local file storage */
+   private final File storageDir;
+
+   /** Blob store for distributed file storage, e.g. in HA */
+   private final BlobView blobView;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the storage directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   // 

+
+   /**
+* Job reference counters with a time-to-live (TTL).
+*/
+   @VisibleForTesting
+   static class RefCount {
+   /**
+* Number of references to a job.
+*/
+   public int references = 0;
+
+   /**
+* Timestamp in milliseconds when any job data should be 
cleaned up (no cleanup for
+* non-positive values).
+*/
+   public lon

[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155571#comment-16155571
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137291080
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides access to transient BLOB files stored at the {@link 
BlobServer}.
+ *
+ * TODO: currently, this is still cache-based with local copies - make 
this truly transient, i.e. return file streams with no local copy
+ */
+public class TransientBlobCache implements TransientBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TransientBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /**
+* Root directory for local file storage
+*/
+   private final File storageDir;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the local storage 
directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   /**
+* Instantiates a new BLOB cache.
+*
+* @param serverAddress
+*  address of the {@link BlobServer} to use for fetching 
files from
+* @param blobClientConfig
+*  global configuration
+*
+* @throws IOException
+*  thrown if the (local or distributed) file storage 
cannot be created or is not usable
+*/
+   public TransientBlobCache(
+   final InetSocketAddress serverAddress,
+   final Configuration blobClientConfig) throws 
IOException {
--- End diff --

Can we change it such that we don't pass in a `Configuration` object but 
instead the required values?


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStor

[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155574#comment-16155574
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137259815
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---
@@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception {
try {
blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
 
-   BlobRecoveryITCase.testBlobServerRecovery(config, 
blobStoreService);
+   BlobServerRecoveryTest.testBlobServerRecovery(config, 
blobStoreService);
+   } finally {
+   if (blobStoreService != null) {
+   blobStoreService.closeAndCleanupAllData();
+   }
+   }
+   }
+
+   /**
+* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed 
corrupted JARs are
+* recognised during the download via a {@link 
org.apache.flink.runtime.blob.BlobServer}.
+*/
+   @Test
+   public void testBlobServerCorruptedFile() throws Exception {
+   org.apache.flink.configuration.Configuration
+   config = new 
org.apache.flink.configuration.Configuration();
+   config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+   config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
hdfsURI);
+
+   BlobStoreService blobStoreService = null;
+
+   try {
+   blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
+
+   
BlobServerCorruptionTest.testGetFailsFromCorruptFile(config, blobStoreService, 
exception);
+   } finally {
+   if (blobStoreService != null) {
+   blobStoreService.closeAndCleanupAllData();
+   }
+   }
+   }
+
+   /**
+* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed 
JARs are recoverable from any
+* participating BlobServer when uploaded via a {@link 
org.apache.flink.runtime.blob.BlobCache}.
+*/
+   @Test
+   public void testBlobCacheRecovery() throws Exception {
+   org.apache.flink.configuration.Configuration
+   config = new 
org.apache.flink.configuration.Configuration();
+   config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+   config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
--- End diff --

Statebackend not defined.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155575#comment-16155575
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137295883
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
 ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a cache for permanent BLOB files including a per-job 
ref-counting and a staged cleanup.
+ * 
+ * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache 
will first attempt to
+ * serve the file from its local cache. Only if the local cache does not 
contain the desired BLOB,
+ * it will try to download it from a distributed HA file system (if 
available) or the BLOB server.
+ * 
+ * If files for a job are not needed any more, they will enter a staged, 
i.e. deferred, cleanup.
+ * Files may thus still be be accessible upon recovery and do not need to 
be re-downloaded.
+ */
+public class PermanentBlobCache extends TimerTask implements 
PermanentBlobService {
--- End diff --

Just a wild thought: I noticed that the `TransientBlobCache` and the 
`PermanentBlobCache` have a lot of code in common. In order to reduce code 
duplication couldn't we create a common base class or let `PermanentBlobCache` 
extend `TransientBlobCache` adding the ref counting?


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137271569
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws 
IOException {
 *
 * @param jobId
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
-* @param requiredBlob
+* @param blobKey
 *  blob key associated with the requested file
+* @param highlyAvailable
+*  whether to the requested file is highly available (HA)
 *
 * @return file referring to the local storage location of the BLOB
 *
 * @throws IOException
 *  Thrown if the file retrieval failed.
 */
-   private File getFileInternal(@Nullable JobID jobId, BlobKey 
requiredBlob) throws IOException {
-   checkArgument(requiredBlob != null, "BLOB key cannot be null.");
+   private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, 
boolean highlyAvailable) throws IOException {
+   checkArgument(blobKey != null, "BLOB key cannot be null.");
 
-   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, requiredBlob);
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, blobKey);
+   readWriteLock.readLock().lock();
 
-   if (localFile.exists()) {
+   try {
+   getFileInternal(jobId, blobKey, highlyAvailable, 
localFile);
return localFile;
+   } finally {
+   readWriteLock.readLock().unlock();
}
-   else {
+   }
+
+   /**
+* Helper to retrieve the local path of a file associated with a job 
and a blob key.
+* 
+* The blob server looks the blob key up in its local storage. If the 
file exists, it is
+* returned. If the file does not exist, it is retrieved from the HA 
blob store (if available)
+* or a {@link FileNotFoundException} is thrown.
+* 
+* Assumes the read lock has already been acquired.
+*
+* @param jobId
+*  ID of the job this blob belongs to (or null if 
job-unrelated)
+* @param blobKey
+*  blob key associated with the requested file
+* @param highlyAvailable
+*  whether to the requested file is highly available (HA)
+* @param localFile
+*  (local) file where the blob is/should be stored
+*
+* @throws IOException
+*  Thrown if the file retrieval failed.
+*/
+   void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean 
highlyAvailable, File localFile) throws IOException {
+   // assume readWriteLock.readLock() was already locked (cannot 
really check that)
+
+   if (localFile.exists()) {
+   return;
+   } else if (highlyAvailable) {
+   // Try the HA blob store
+   // first we have to release the read lock in order to 
acquire the write lock
+   readWriteLock.readLock().unlock();
+
+   // use a temporary file (thread-safe without locking)
+   File incomingFile = null;
try {
-   // Try the blob store
-   blobStore.get(jobId, requiredBlob, localFile);
+   incomingFile = createTemporaryFilename();
+   blobStore.get(jobId, blobKey, incomingFile);
+
+   BlobUtils.moveTempFileToStore(
+   incomingFile, jobId, blobKey, 
localFile, readWriteLock.writeLock(), LOG, null);
--- End diff --

Not sure whether the `writeLock` should escape the scope of the BlobServer 
via `BlobUtils.moveTempFileStore`. I think it would be better to lock outside 
of the `moveTempFileToStore` method. This should also give a better separation 
of concerns.


---


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137268133
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -601,7 +564,39 @@ public void deleteInternal(@Nullable JobID jobId, 
BlobKey key) throws IOExceptio
}
 
/**
-* Uploads the JAR files to a {@link BlobServer} at the given address.
+* Reads the response from the input stream and throws in case of errors
+*
+* @param is
+*  stream to read from
+*
+* @return  true if the delete operation was successful at the 
{@link BlobServer};
+*  false otherwise
+*
+* @throws IOException
+*  if the server code throws an exception or if reading 
the response failed
+*/
+   private static boolean receiveAndCheckDeleteResponse(InputStream is) 
throws IOException {
+   int response = is.read();
+   if (response < 0) {
+   throw new EOFException("Premature end of response");
+   }
+   if (response == RETURN_ERROR) {
+   Throwable cause = readExceptionFromStream(is);
+   if (cause == null) {
+   return false;
+   } else {
+   throw new IOException("Server side error: " + 
cause.getMessage(), cause);
--- End diff --

I think we don't have to append the cause message to `IOException's` 
message, because it is included in the `cause`.


---


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1617#comment-1617
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137266511
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, 
BlobKey blobKey) throws IOExcepti
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
 * @param blobKey
 *  blob key associated with the requested file
+* @param permanentBlob
+*  whether the BLOB is permanent (true) or 
transient (false)
 *
 * @throws IOException
 * thrown if an I/O error occurs while writing the header data 
to the output stream
 */
-   private static void sendGetHeader(OutputStream outputStream, @Nullable 
JobID jobId, BlobKey blobKey) throws IOException {
+   private static void sendGetHeader(
+   OutputStream outputStream, @Nullable JobID jobId, 
BlobKey blobKey, boolean permanentBlob)
+   throws IOException {
checkNotNull(blobKey);
+   checkArgument(jobId != null || !permanentBlob, "permanent BLOBs 
must be job-related");
 
// Signal type of operation
outputStream.write(GET_OPERATION);
 
// Send job ID and key
if (jobId == null) {
outputStream.write(CONTENT_NO_JOB);
+   } else if (permanentBlob) {
+   outputStream.write(CONTENT_FOR_JOB_HA);
--- End diff --

Does it make sense to rename this constant to `PERMANENT_JOB_CONTENT`?


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155561#comment-16155561
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137293376
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -215,10 +215,10 @@ public static ClassLoader retrieveClassLoader(
JobManagerMessages.ClassloadingProps props = 
optProps.get();
 
InetSocketAddress serverAddress = new 
InetSocketAddress(jobManager.getHostname(), props.blobManagerPort());
-   final BlobCache blobClient;
+   final PermanentBlobCache blobClient;
try {
-   // TODO: Fix lifecycle of BlobCache to properly 
close it upon usage
-   blobClient = new BlobCache(serverAddress, 
config, highAvailabilityServices.createBlobStore());
+   // TODO: Fix lifecycle of PermanentBlobCache to 
properly close it upon usage
+   blobClient = new 
PermanentBlobCache(serverAddress, config, 
highAvailabilityServices.createBlobStore());
--- End diff --

shouldn't the variable be called `permanentBlobCache` instead of 
`blobClient`?


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155577#comment-16155577
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137268826
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -415,13 +393,17 @@ private BlobKey putBuffer(@Nullable JobID jobId, 
byte[] value, int offset, int l
 *  the ID of the job the BLOB belongs to (or null 
if job-unrelated)
 * @param inputStream
 *  the input stream to read the data from
+* @param permanentBlob
+*  whether the BLOB is permanent (true) or 
transient (false)
 *
 * @return the computed BLOB key of the uploaded BLOB
 *
 * @throws IOException
 *  thrown if an I/O error occurs while uploading the data 
to the BLOB server
 */
-   private BlobKey putInputStream(@Nullable JobID jobId, InputStream 
inputStream) throws IOException {
+   BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream, 
boolean permanentBlob)
--- End diff --

Should we introduce an `enum` instead of a boolean denoting whether the 
content is transient or permanent? This would have the advantage that it's much 
clearer what's happening when looking at code at the calling side.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155572#comment-16155572
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137303136
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java ---
@@ -18,61 +18,31 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.api.common.JobID;
-
 import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
 
 /**
  * A simple store and retrieve binary large objects (BLOBs).
  */
 public interface BlobService extends Closeable {
 
/**
-* Returns the path to a local copy of the (job-unrelated) file 
associated with the provided
-* blob key.
-*
-* @param key blob key associated with the requested file
-* @return The path to the file.
-* @throws java.io.FileNotFoundException when the path does not exist;
-* @throws IOException if any other error occurs when retrieving the 
file
-*/
-   File getFile(BlobKey key) throws IOException;
-
-   /**
-* Returns the path to a local copy of the file associated with the 
provided job ID and blob key.
+* Returns a BLOB service for accessing permanent BLOBs.
 *
-* @param jobId ID of the job this blob belongs to
-* @param key blob key associated with the requested file
-* @return The path to the file.
-* @throws java.io.FileNotFoundException when the path does not exist;
-* @throws IOException if any other error occurs when retrieving the 
file
+* @return BLOB service
 */
-   File getFile(JobID jobId, BlobKey key) throws IOException;
+   PermanentBlobService getPermanentBlobStore();
--- End diff --

I'm not so sure about the naming here. The `BlobStore` is actually 
something else than the `BlobService`. Would be good if we use a consistent 
naming for the different things (this could also include renaming some of the 
entities).


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155565#comment-16155565
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137266916
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, 
BlobKey blobKey) throws IOExcepti
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
 * @param blobKey
 *  blob key associated with the requested file
+* @param permanentBlob
+*  whether the BLOB is permanent (true) or 
transient (false)
 *
 * @throws IOException
 * thrown if an I/O error occurs while writing the header data 
to the output stream
 */
-   private static void sendGetHeader(OutputStream outputStream, @Nullable 
JobID jobId, BlobKey blobKey) throws IOException {
+   private static void sendGetHeader(
+   OutputStream outputStream, @Nullable JobID jobId, 
BlobKey blobKey, boolean permanentBlob)
+   throws IOException {
checkNotNull(blobKey);
+   checkArgument(jobId != null || !permanentBlob, "permanent BLOBs 
must be job-related");
 
// Signal type of operation
outputStream.write(GET_OPERATION);
 
// Send job ID and key
if (jobId == null) {
outputStream.write(CONTENT_NO_JOB);
--- End diff --

Should we rename this variable to `TRANSIENT_CONTENT`?


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137262572
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 ---
@@ -158,7 +151,7 @@ protected void respondAsLeader(final 
ChannelHandlerContext ctx, final Routed rou
cache = blobPortFuture.thenApplyAsync(
(Integer port) -> {
try {
-   return new BlobCache(new 
InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView);
+   return new 
TransientBlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), 
port), config);
} catch (IOException e) {
throw new 
FlinkFutureException("Could not create BlobCache.", e);
--- End diff --

Maybe we could adapt the exception message to `TransientBlobCache`.


---


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155567#comment-16155567
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137271569
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws 
IOException {
 *
 * @param jobId
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
-* @param requiredBlob
+* @param blobKey
 *  blob key associated with the requested file
+* @param highlyAvailable
+*  whether to the requested file is highly available (HA)
 *
 * @return file referring to the local storage location of the BLOB
 *
 * @throws IOException
 *  Thrown if the file retrieval failed.
 */
-   private File getFileInternal(@Nullable JobID jobId, BlobKey 
requiredBlob) throws IOException {
-   checkArgument(requiredBlob != null, "BLOB key cannot be null.");
+   private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, 
boolean highlyAvailable) throws IOException {
+   checkArgument(blobKey != null, "BLOB key cannot be null.");
 
-   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, requiredBlob);
+   final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, blobKey);
+   readWriteLock.readLock().lock();
 
-   if (localFile.exists()) {
+   try {
+   getFileInternal(jobId, blobKey, highlyAvailable, 
localFile);
return localFile;
+   } finally {
+   readWriteLock.readLock().unlock();
}
-   else {
+   }
+
+   /**
+* Helper to retrieve the local path of a file associated with a job 
and a blob key.
+* 
+* The blob server looks the blob key up in its local storage. If the 
file exists, it is
+* returned. If the file does not exist, it is retrieved from the HA 
blob store (if available)
+* or a {@link FileNotFoundException} is thrown.
+* 
+* Assumes the read lock has already been acquired.
+*
+* @param jobId
+*  ID of the job this blob belongs to (or null if 
job-unrelated)
+* @param blobKey
+*  blob key associated with the requested file
+* @param highlyAvailable
+*  whether to the requested file is highly available (HA)
+* @param localFile
+*  (local) file where the blob is/should be stored
+*
+* @throws IOException
+*  Thrown if the file retrieval failed.
+*/
+   void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean 
highlyAvailable, File localFile) throws IOException {
+   // assume readWriteLock.readLock() was already locked (cannot 
really check that)
+
+   if (localFile.exists()) {
+   return;
+   } else if (highlyAvailable) {
+   // Try the HA blob store
+   // first we have to release the read lock in order to 
acquire the write lock
+   readWriteLock.readLock().unlock();
+
+   // use a temporary file (thread-safe without locking)
+   File incomingFile = null;
try {
-   // Try the blob store
-   blobStore.get(jobId, requiredBlob, localFile);
+   incomingFile = createTemporaryFilename();
+   blobStore.get(jobId, blobKey, incomingFile);
+
+   BlobUtils.moveTempFileToStore(
+   incomingFile, jobId, blobKey, 
localFile, readWriteLock.writeLock(), LOG, null);
--- End diff --

Not sure whether the `writeLock` should escape the scope of the BlobServer 
via `BlobUtils.moveTempFileStore`. I think it would be better to lock outside 
of the `moveTempFileToStore` method. This should also give a better separation 
of concerns.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Re

[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155566#comment-16155566
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137262572
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 ---
@@ -158,7 +151,7 @@ protected void respondAsLeader(final 
ChannelHandlerContext ctx, final Routed rou
cache = blobPortFuture.thenApplyAsync(
(Integer port) -> {
try {
-   return new BlobCache(new 
InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView);
+   return new 
TransientBlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), 
port), config);
} catch (IOException e) {
throw new 
FlinkFutureException("Could not create BlobCache.", e);
--- End diff --

Maybe we could adapt the exception message to `TransientBlobCache`.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155576#comment-16155576
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137302842
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -174,7 +176,6 @@ public ServerSocket createSocket(int port) throws 
IOException {
// start the server thread
setName("BLOB Server listener at " + getPort());
setDaemon(true);
-   start();
--- End diff --

Why did you pull `start` out of the constructor? Wouldn't one always want 
to start the `BlobServer` when creating it?


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137289123
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
 ---
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a cache for permanent BLOB files including a per-job 
ref-counting and a staged cleanup.
+ * 
+ * When requesting BLOBs via {@link #getHAFile(JobID, BlobKey)}, the cache 
will first attempt to
+ * serve the file from its local cache. Only if the local cache does not 
contain the desired BLOB,
+ * it will try to download it from a distributed HA file system (if 
available) or the BLOB server.
+ * 
+ * If files for a job are not needed any more, they will enter a staged, 
i.e. deferred, cleanup.
+ * Files may thus still be be accessible upon recovery and do not need to 
be re-downloaded.
+ */
+public class PermanentBlobCache extends TimerTask implements 
PermanentBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(PermanentBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /** Root directory for local file storage */
+   private final File storageDir;
+
+   /** Blob store for distributed file storage, e.g. in HA */
+   private final BlobView blobView;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the storage directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   // 

+
+   /**
+* Job reference counters with a time-to-live (TTL).
+*/
+   @VisibleForTesting
+   static class RefCount {
+   /**
+* Number of references to a job.
+*/
+   public int references = 0;
+
+   /**
+* Timestamp in milliseconds when any job data should be 
cleaned up (no cleanup for
+* non-positive values).
+*/
+   public long keepUntil = -1;
+   }
+
+   /** Map to store the number of references to a specific job */
+   private final Map jobRefCounters = new HashMap<>();
+
+   /** Time interval (ms) to run the cleanup task; also used as the 
default TTL

[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137259798
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---
@@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception {
try {
blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
 
-   BlobRecoveryITCase.testBlobServerRecovery(config, 
blobStoreService);
+   BlobServerRecoveryTest.testBlobServerRecovery(config, 
blobStoreService);
+   } finally {
+   if (blobStoreService != null) {
+   blobStoreService.closeAndCleanupAllData();
+   }
+   }
+   }
+
+   /**
+* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed 
corrupted JARs are
+* recognised during the download via a {@link 
org.apache.flink.runtime.blob.BlobServer}.
+*/
+   @Test
+   public void testBlobServerCorruptedFile() throws Exception {
+   org.apache.flink.configuration.Configuration
+   config = new 
org.apache.flink.configuration.Configuration();
+   config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+   config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
hdfsURI);
+
+   BlobStoreService blobStoreService = null;
+
+   try {
+   blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
--- End diff --

Why not creating the `blobStoreService` outside of the try-finally block. 
Then you don't have to make the null check in the finally block.


---


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137266698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, 
BlobKey blobKey) throws IOExcepti
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
 * @param blobKey
 *  blob key associated with the requested file
+* @param permanentBlob
+*  whether the BLOB is permanent (true) or 
transient (false)
 *
 * @throws IOException
 * thrown if an I/O error occurs while writing the header data 
to the output stream
 */
-   private static void sendGetHeader(OutputStream outputStream, @Nullable 
JobID jobId, BlobKey blobKey) throws IOException {
+   private static void sendGetHeader(
+   OutputStream outputStream, @Nullable JobID jobId, 
BlobKey blobKey, boolean permanentBlob)
+   throws IOException {
checkNotNull(blobKey);
+   checkArgument(jobId != null || !permanentBlob, "permanent BLOBs 
must be job-related");
 
// Signal type of operation
outputStream.write(GET_OPERATION);
 
// Send job ID and key
if (jobId == null) {
outputStream.write(CONTENT_NO_JOB);
+   } else if (permanentBlob) {
+   outputStream.write(CONTENT_FOR_JOB_HA);
+   outputStream.write(jobId.getBytes());
} else {
outputStream.write(CONTENT_FOR_JOB);
--- End diff --

Same here to `TRANSIENT_JOB_CONTENT`?


---


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137259815
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---
@@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception {
try {
blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
 
-   BlobRecoveryITCase.testBlobServerRecovery(config, 
blobStoreService);
+   BlobServerRecoveryTest.testBlobServerRecovery(config, 
blobStoreService);
+   } finally {
+   if (blobStoreService != null) {
+   blobStoreService.closeAndCleanupAllData();
+   }
+   }
+   }
+
+   /**
+* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed 
corrupted JARs are
+* recognised during the download via a {@link 
org.apache.flink.runtime.blob.BlobServer}.
+*/
+   @Test
+   public void testBlobServerCorruptedFile() throws Exception {
+   org.apache.flink.configuration.Configuration
+   config = new 
org.apache.flink.configuration.Configuration();
+   config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+   config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
hdfsURI);
+
+   BlobStoreService blobStoreService = null;
+
+   try {
+   blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
+
+   
BlobServerCorruptionTest.testGetFailsFromCorruptFile(config, blobStoreService, 
exception);
+   } finally {
+   if (blobStoreService != null) {
+   blobStoreService.closeAndCleanupAllData();
+   }
+   }
+   }
+
+   /**
+* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed 
JARs are recoverable from any
+* participating BlobServer when uploaded via a {@link 
org.apache.flink.runtime.blob.BlobCache}.
+*/
+   @Test
+   public void testBlobCacheRecovery() throws Exception {
+   org.apache.flink.configuration.Configuration
+   config = new 
org.apache.flink.configuration.Configuration();
+   config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+   config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
--- End diff --

Statebackend not defined.


---


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155573#comment-16155573
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137266698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, 
BlobKey blobKey) throws IOExcepti
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
 * @param blobKey
 *  blob key associated with the requested file
+* @param permanentBlob
+*  whether the BLOB is permanent (true) or 
transient (false)
 *
 * @throws IOException
 * thrown if an I/O error occurs while writing the header data 
to the output stream
 */
-   private static void sendGetHeader(OutputStream outputStream, @Nullable 
JobID jobId, BlobKey blobKey) throws IOException {
+   private static void sendGetHeader(
+   OutputStream outputStream, @Nullable JobID jobId, 
BlobKey blobKey, boolean permanentBlob)
+   throws IOException {
checkNotNull(blobKey);
+   checkArgument(jobId != null || !permanentBlob, "permanent BLOBs 
must be job-related");
 
// Signal type of operation
outputStream.write(GET_OPERATION);
 
// Send job ID and key
if (jobId == null) {
outputStream.write(CONTENT_NO_JOB);
+   } else if (permanentBlob) {
+   outputStream.write(CONTENT_FOR_JOB_HA);
+   outputStream.write(jobId.getBytes());
} else {
outputStream.write(CONTENT_FOR_JOB);
--- End diff --

Same here to `TRANSIENT_JOB_CONTENT`?


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155568#comment-16155568
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137292784
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobService.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A service to retrieve transient binary large objects (BLOBs).
+ * 
+ * These include per-job BLOBs that are , e.g. a job's JAR files, parts of 
an off-loaded {@link
--- End diff --

I thought that job's JAR files are stored in the permanent blob service?


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137293376
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -215,10 +215,10 @@ public static ClassLoader retrieveClassLoader(
JobManagerMessages.ClassloadingProps props = 
optProps.get();
 
InetSocketAddress serverAddress = new 
InetSocketAddress(jobManager.getHostname(), props.blobManagerPort());
-   final BlobCache blobClient;
+   final PermanentBlobCache blobClient;
try {
-   // TODO: Fix lifecycle of BlobCache to properly 
close it upon usage
-   blobClient = new BlobCache(serverAddress, 
config, highAvailabilityServices.createBlobStore());
+   // TODO: Fix lifecycle of PermanentBlobCache to 
properly close it upon usage
+   blobClient = new 
PermanentBlobCache(serverAddress, config, 
highAvailabilityServices.createBlobStore());
--- End diff --

shouldn't the variable be called `permanentBlobCache` instead of 
`blobClient`?


---


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137303136
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java ---
@@ -18,61 +18,31 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.api.common.JobID;
-
 import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
 
 /**
  * A simple store and retrieve binary large objects (BLOBs).
  */
 public interface BlobService extends Closeable {
 
/**
-* Returns the path to a local copy of the (job-unrelated) file 
associated with the provided
-* blob key.
-*
-* @param key blob key associated with the requested file
-* @return The path to the file.
-* @throws java.io.FileNotFoundException when the path does not exist;
-* @throws IOException if any other error occurs when retrieving the 
file
-*/
-   File getFile(BlobKey key) throws IOException;
-
-   /**
-* Returns the path to a local copy of the file associated with the 
provided job ID and blob key.
+* Returns a BLOB service for accessing permanent BLOBs.
 *
-* @param jobId ID of the job this blob belongs to
-* @param key blob key associated with the requested file
-* @return The path to the file.
-* @throws java.io.FileNotFoundException when the path does not exist;
-* @throws IOException if any other error occurs when retrieving the 
file
+* @return BLOB service
 */
-   File getFile(JobID jobId, BlobKey key) throws IOException;
+   PermanentBlobService getPermanentBlobStore();
--- End diff --

I'm not so sure about the naming here. The `BlobStore` is actually 
something else than the `BlobService`. Would be good if we use a consistent 
naming for the different things (this could also include renaming some of the 
entities).


---


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137266916
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, 
BlobKey blobKey) throws IOExcepti
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
 * @param blobKey
 *  blob key associated with the requested file
+* @param permanentBlob
+*  whether the BLOB is permanent (true) or 
transient (false)
 *
 * @throws IOException
 * thrown if an I/O error occurs while writing the header data 
to the output stream
 */
-   private static void sendGetHeader(OutputStream outputStream, @Nullable 
JobID jobId, BlobKey blobKey) throws IOException {
+   private static void sendGetHeader(
+   OutputStream outputStream, @Nullable JobID jobId, 
BlobKey blobKey, boolean permanentBlob)
+   throws IOException {
checkNotNull(blobKey);
+   checkArgument(jobId != null || !permanentBlob, "permanent BLOBs 
must be job-related");
 
// Signal type of operation
outputStream.write(GET_OPERATION);
 
// Send job ID and key
if (jobId == null) {
outputStream.write(CONTENT_NO_JOB);
--- End diff --

Should we rename this variable to `TRANSIENT_CONTENT`?


---


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137268826
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -415,13 +393,17 @@ private BlobKey putBuffer(@Nullable JobID jobId, 
byte[] value, int offset, int l
 *  the ID of the job the BLOB belongs to (or null 
if job-unrelated)
 * @param inputStream
 *  the input stream to read the data from
+* @param permanentBlob
+*  whether the BLOB is permanent (true) or 
transient (false)
 *
 * @return the computed BLOB key of the uploaded BLOB
 *
 * @throws IOException
 *  thrown if an I/O error occurs while uploading the data 
to the BLOB server
 */
-   private BlobKey putInputStream(@Nullable JobID jobId, InputStream 
inputStream) throws IOException {
+   BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream, 
boolean permanentBlob)
--- End diff --

Should we introduce an `enum` instead of a boolean denoting whether the 
content is transient or permanent? This would have the advantage that it's much 
clearer what's happening when looking at code at the calling side.


---


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137291080
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides access to transient BLOB files stored at the {@link 
BlobServer}.
+ *
+ * TODO: currently, this is still cache-based with local copies - make 
this truly transient, i.e. return file streams with no local copy
+ */
+public class TransientBlobCache implements TransientBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TransientBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /**
+* Root directory for local file storage
+*/
+   private final File storageDir;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the local storage 
directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   /**
+* Instantiates a new BLOB cache.
+*
+* @param serverAddress
+*  address of the {@link BlobServer} to use for fetching 
files from
+* @param blobClientConfig
+*  global configuration
+*
+* @throws IOException
+*  thrown if the (local or distributed) file storage 
cannot be created or is not usable
+*/
+   public TransientBlobCache(
+   final InetSocketAddress serverAddress,
+   final Configuration blobClientConfig) throws 
IOException {
--- End diff --

Can we change it such that we don't pass in a `Configuration` object but 
instead the required values?


---


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155569#comment-16155569
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137291887
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides access to transient BLOB files stored at the {@link 
BlobServer}.
+ *
+ * TODO: currently, this is still cache-based with local copies - make 
this truly transient, i.e. return file streams with no local copy
+ */
+public class TransientBlobCache implements TransientBlobService {
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TransientBlobCache.class);
+
+   /** Counter to generate unique names for temporary files. */
+   private final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   private final InetSocketAddress serverAddress;
+
+   /**
+* Root directory for local file storage
+*/
+   private final File storageDir;
+
+   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /** Shutdown hook thread to ensure deletion of the local storage 
directory. */
+   private final Thread shutdownHook;
+
+   /** The number of retries when the transfer fails */
+   private final int numFetchRetries;
+
+   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
+   private final Configuration blobClientConfig;
+
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
+   /**
+* Instantiates a new BLOB cache.
+*
+* @param serverAddress
+*  address of the {@link BlobServer} to use for fetching 
files from
+* @param blobClientConfig
+*  global configuration
+*
+* @throws IOException
+*  thrown if the (local or distributed) file storage 
cannot be created or is not usable
+*/
+   public TransientBlobCache(
+   final InetSocketAddress serverAddress,
+   final Configuration blobClientConfig) throws 
IOException {
--- End diff --

Probably not so easy because we also need it for the `BlobClient` creation.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cas

[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155563#comment-16155563
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137259798
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---
@@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception {
try {
blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
 
-   BlobRecoveryITCase.testBlobServerRecovery(config, 
blobStoreService);
+   BlobServerRecoveryTest.testBlobServerRecovery(config, 
blobStoreService);
+   } finally {
+   if (blobStoreService != null) {
+   blobStoreService.closeAndCleanupAllData();
+   }
+   }
+   }
+
+   /**
+* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed 
corrupted JARs are
+* recognised during the download via a {@link 
org.apache.flink.runtime.blob.BlobServer}.
+*/
+   @Test
+   public void testBlobServerCorruptedFile() throws Exception {
+   org.apache.flink.configuration.Configuration
+   config = new 
org.apache.flink.configuration.Configuration();
+   config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+   config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   temporaryFolder.newFolder().getAbsolutePath());
+   config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
hdfsURI);
+
+   BlobStoreService blobStoreService = null;
+
+   try {
+   blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
--- End diff --

Why not creating the `blobStoreService` outside of the try-finally block. 
Then you don't have to make the null check in the finally block.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137266511
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -220,19 +260,27 @@ InputStream getInternal(@Nullable JobID jobId, 
BlobKey blobKey) throws IOExcepti
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
 * @param blobKey
 *  blob key associated with the requested file
+* @param permanentBlob
+*  whether the BLOB is permanent (true) or 
transient (false)
 *
 * @throws IOException
 * thrown if an I/O error occurs while writing the header data 
to the output stream
 */
-   private static void sendGetHeader(OutputStream outputStream, @Nullable 
JobID jobId, BlobKey blobKey) throws IOException {
+   private static void sendGetHeader(
+   OutputStream outputStream, @Nullable JobID jobId, 
BlobKey blobKey, boolean permanentBlob)
+   throws IOException {
checkNotNull(blobKey);
+   checkArgument(jobId != null || !permanentBlob, "permanent BLOBs 
must be job-related");
 
// Signal type of operation
outputStream.write(GET_OPERATION);
 
// Send job ID and key
if (jobId == null) {
outputStream.write(CONTENT_NO_JOB);
+   } else if (permanentBlob) {
+   outputStream.write(CONTENT_FOR_JOB_HA);
--- End diff --

Does it make sense to rename this constant to `PERMANENT_JOB_CONTENT`?


---


[GitHub] flink pull request #4358: [FLINK-7068][blob] change BlobService sub-classes ...

2017-09-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r137259367
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---
@@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception {
try {
blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
 
-   BlobRecoveryITCase.testBlobServerRecovery(config, 
blobStoreService);
+   BlobServerRecoveryTest.testBlobServerRecovery(config, 
blobStoreService);
+   } finally {
+   if (blobStoreService != null) {
+   blobStoreService.closeAndCleanupAllData();
+   }
+   }
+   }
+
+   /**
+* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed 
corrupted JARs are
+* recognised during the download via a {@link 
org.apache.flink.runtime.blob.BlobServer}.
+*/
+   @Test
+   public void testBlobServerCorruptedFile() throws Exception {
+   org.apache.flink.configuration.Configuration
+   config = new 
org.apache.flink.configuration.Configuration();
+   config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+   config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
--- End diff --

I think "ZooKeeper" is not a valid state backend. What did you want to do 
with that?


---


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155541#comment-16155541
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4239
  
What were the bugs that you fixed?


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...

2017-09-06 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4239
  
What were the bugs that you fixed?


---


[GitHub] flink pull request #4575: [FLINK-7494][travis] Add license headers to '.trav...

2017-09-06 Thread yew1eb
Github user yew1eb commented on a diff in the pull request:

https://github.com/apache/flink/pull/4575#discussion_r137297560
  
--- Diff: .travis.yml ---
@@ -1,4 +1,19 @@
-# s3 deployment based on 
http://about.travis-ci.org/blog/2012-12-18-travis-artifacts/
--- End diff --

done.


---


  1   2   3   >