[GitHub] flink issue #6373: [FLINK-9838][logging] Don't log slot request failures on ...

2018-07-23 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6373
  
Ran into this bug too.

+1 for the fix


---


[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...

2018-07-12 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202199865
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -134,6 +134,10 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The interval between each attempt to discover new shards. */
public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = 
"flink.shard.discovery.intervalmillis";
 
+   /** The config to turn on adaptive reads from a shard. */
+   public static final String SHARD_USE_ADAPTIVE_READS = 
"flink.shard.use.adaptive.reads";
--- End diff --

[most Flink's feature 
flags](https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html) 
are named `xx.enabled`,  I'd suggest rename it to something like 
`flink.shard.adaptive.read.records.enabled`


---


[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...

2018-07-12 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202201507
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   protected int getAdaptiveMaxRecordsPerFetch(long 
averageRecordSizeBytes) {
+   int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch;
+   if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) {
+   adaptedMaxRecordsPerFetch = (int) 
(KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / 
fetchIntervalMillis));
+
+   // Ensure the value is not more than 1L
+   adaptedMaxRecordsPerFetch = 
adaptedMaxRecordsPerFetch <= 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ?
--- End diff --

adaptedMaxRecordsPerFetch = Math.min(adaptedMaxRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);


---


[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...

2018-07-12 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6319#discussion_r202198679
  
--- Diff: flink-container/docker/README.md ---
@@ -0,0 +1,44 @@
+# Apache Flink cluster deployment on docker using docker-compose
--- End diff --

Does this apply to both standalone and cluster mode? Want to get clarified 
since the PR title says it's for standaloneJobCluster


---


[GitHub] flink pull request #6319: [FLINK-9822] Add Dockerfile for StandaloneJobClust...

2018-07-12 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6319#discussion_r202197686
  
--- Diff: flink-container/docker/README.md ---
@@ -0,0 +1,44 @@
+# Apache Flink cluster deployment on docker using docker-compose
+
+## Installation
+
+Install the most recent stable version of docker
+https://docs.docker.com/installation/
+
+## Build
+
+Images are based on the official Java Alpine (OpenJDK 8) image. If you 
want to
+build the flink image run:
+
+sh build.sh --job-jar /path/to/job/jar/job.jar --image-name flink:job
+
+or
+
+docker build -t flink .
+
+If you want to build the container for a specific version of 
flink/hadoop/scala
+you can configure it in the respective args:
+
+docker build --build-arg FLINK_VERSION=1.0.3 --build-arg 
HADOOP_VERSION=26 --build-arg SCALA_VERSION=2.10 -t 
"flink:1.0.3-hadoop2.6-scala_2.10" flink
--- End diff --

Is FLINK_VERSION 1.0.3 only for demo purpose? Can we use a more recent 
version for demoing?


---


[GitHub] flink issue #6109: [FLINK-9483] 'Building Flink' doc doesn't highlight quick...

2018-07-10 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6109
  
cc @zentol 


---


[GitHub] flink pull request #6277: [FLINK-9511] Implement TTL config

2018-07-09 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6277#discussion_r201216406
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
 ---
@@ -93,4 +97,82 @@ public Time getTtl() {
public TtlTimeCharacteristic getTimeCharacteristic() {
return timeCharacteristic;
}
+
+   @Override
+   public String toString() {
+   return "StateTtlConfiguration{" +
+   "ttlUpdateType=" + ttlUpdateType +
+   ", stateVisibility=" + stateVisibility +
+   ", timeCharacteristic=" + timeCharacteristic +
+   ", ttl=" + ttl +
+   '}';
+   }
+
+   public static Builder newBuilder(Time ttl) {
+   return new Builder(ttl);
+   }
+
+   /**
+* Builder for the {@link StateTtlConfiguration}.
+*/
+   public static class Builder {
+
+   private TtlUpdateType ttlUpdateType = OnCreateAndWrite;
+   private TtlStateVisibility stateVisibility = NeverReturnExpired;
+   private TtlTimeCharacteristic timeCharacteristic = 
ProcessingTime;
+   private Time ttl;
+
+   public Builder(Time ttl) {
--- End diff --

Should `TimeCharacteristic` be a builder's constructor param as well?  
Otherwise, users may not notice they have to set it, and may easily lead into 
confusion


---


[GitHub] flink issue #6290: [Flink-9691] [Kinesis Connector] Attempt to call getRecor...

2018-07-09 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6290
  
+1, LGTM


---


[GitHub] flink issue #6195: [FLINK-9543][METRICS] Expose JobMaster ID to metric syste...

2018-07-04 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6195
  
Can you also add documentation to the Metrics page?


---


[GitHub] flink pull request #6130: [FLINK-9545] Support read a file multiple times in...

2018-06-15 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/6130


---


[GitHub] flink issue #6156: [FLINK-9572] Extend InternalAppendingState with internal ...

2018-06-13 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6156
  
+1 LGTM


---


[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

2018-06-13 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6130
  
Not really. It's not about having n copies of data. One use case is 
File-fed stream pipeline usually runs very fast with inadequate metrics, users 
need to run it end-to-end for a longer time to gather stable metrics and tune 
all components in the pipeline.

I'll close it if community is not interested.


---


[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

2018-06-07 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6130
  
@aljoscha 

Motivation: We have the requirements to read a bunch files, each file to 
read multiple times, to feed our streams

Specifically we need `StreamExecutionEnvironment.readFile/readTextFile` to 
be able to read a file for a specified `N` times, but currently it only 
supports reading file once.

We've implemented this internally. Would be good to get it back to the 
community version. This jira is to add support for the feature. 


---


[GitHub] flink pull request #6130: [FLINK-9545] Support read a file multiple times in...

2018-06-06 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/6130

[FLINK-9545] Support read a file multiple times in Flink DataStream

## What is the purpose of the change

we need `StreamExecutionEnvironment.readFile/readTextFile` to read each 
file for N times, but currently it only supports reading file once.

add support for the feature.

## Brief change log

- add a new processing mode as PROCESSING_N_TIMES
- add additional parameter numTimes for 
StreamExecutionEnvironment.readFile/readTextFile

## Verifying this change

This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-9545

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6130.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6130


commit d51fd25ca0ff8e38aaf84d2076c9c979cd136c9d
Author: Bowen Li 
Date:   2018-06-07T00:12:59Z

[FLINK-9545] Support read a file multiple times in Flink DataStream




---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-04 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r192834879
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+public class TimeoutLatch {
+
+   private final Object lock = new Object();
+   private volatile boolean waiting;
+
+   public void await(long timeout) throws InterruptedException {
+   synchronized (lock) {
+   waiting = true;
+   lock.wait(timeout);
+   }
+   }
+
+   public void trigger() {
+   if (waiting) {
+   synchronized (lock) {
+   waiting = false;
--- End diff --

needs another `if (waiting)` here inside the synchronized block, to ensure 
no one chimes in between line 34 and 35


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-04 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r192837000
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -180,9 +204,16 @@ public void open(Configuration parameters) throws 
Exception {
KinesisProducerConfiguration producerConfig = 
KinesisConfigUtil.getValidatedProducerConfiguration(configProps);
 
producer = getKinesisProducer(producerConfig);
+
+   final MetricGroup kinesisMectricGroup = 
getRuntimeContext().getMetricGroup().addGroup("kinesisProducer");
--- End diff --

minor: better to make these three strings constant (static final String) 
for easier maintenance.


---


[GitHub] flink issue #6116: [FLINK-9498][build] Disable dependency convergence for fl...

2018-06-04 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6116
  
Will it lower the possibility of detecting lib version conflicts of Flink's 
dependencies?


---


[GitHub] flink pull request #6109: [FLINK-9483] 'Building Flink' doc doesn't highligh...

2018-06-03 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6109#discussion_r192628940
  
--- Diff: docs/start/building.md ---
@@ -50,7 +50,11 @@ mvn clean install -DskipTests
 
 This instructs [Maven](http://maven.apache.org) (`mvn`) to first remove 
all existing builds (`clean`) and then create a new Flink binary (`install`).
 
-To speed up the build you can skip tests, checkstyle, and JavaDocs: `mvn 
clean install -DskipTests -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true`.
+To speed up the build you can skip tests, checkstyle, and JavaDocs:
+
+{% highlight bash %}
+mvn clean install -DskipTests -Dmaven.javadoc.skip=true 
-Dcheckstyle.skip=true
--- End diff --


https://github.com/apache/flink/blob/56df6904688642b1c8f9a287646c163dfae7edfd/pom.xml#L639


---


[GitHub] flink pull request #6109: [FLINK-9483] 'Building Flink' doc doesn't highligh...

2018-06-01 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6109#discussion_r192535262
  
--- Diff: docs/start/building.md ---
@@ -50,7 +50,11 @@ mvn clean install -DskipTests
 
 This instructs [Maven](http://maven.apache.org) (`mvn`) to first remove 
all existing builds (`clean`) and then create a new Flink binary (`install`).
 
-To speed up the build you can skip tests, checkstyle, and JavaDocs: `mvn 
clean install -DskipTests -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true`.
+To speed up the build you can skip tests, checkstyle, and JavaDocs:
--- End diff --

What's `QA-plugins`? Do you mean the set of QA checks in 
`tools/qa-check.sh`?


---


[GitHub] flink pull request #6109: [FLINK-9483] 'Building Flink' doc doesn't highligh...

2018-05-31 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/6109

[FLINK-9483] 'Building Flink' doc doesn't highlight quick build command

## What is the purpose of the change

The blue part isn't corrected highlighted as the red ones

![screen shot 2018-05-31 at 4 12 32 
pm](https://user-images.githubusercontent.com/1892692/40813097-ff439950-64ed-11e8-8a10-512934d89116.png)

## Brief change log

Highlight quick build command

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-9483

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6109.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6109


commit 8ea6f791de0266f481c45d03731d15ed999ea753
Author: Bowen Li 
Date:   2018-05-31T23:15:41Z

[FLINK-9483] 'Building Flink' doc doesn't highlight quick build command




---


[GitHub] flink pull request #5649: [FLINK-8873] [DataStream API] [Tests] move unit te...

2018-05-31 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/5649


---


[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...

2018-05-30 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6104#discussion_r191858415
  
--- Diff: docs/dev/libs/cep.md ---
@@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed 
matters. To guarantee that el
 
 To guarantee that elements across watermarks are processed in event-time 
order, Flink's CEP library assumes
 *correctness of the watermark*, and considers as *late* elements whose 
timestamp is smaller than that of the last
-seen watermark. Late elements are not further processed.
+seen watermark. Late elements are not further processed. Also, you can 
specify a sideOutput tag to collect the late elements come after the last seen 
watermark, you can use it like this.
+
+
+
+
+{% highlight java %}
+PatternStream patternStream = CEP.pattern(input, pattern);
+
+OutputTag lataDataOutputTag = new 
OutputTag("lata-data""){};
+
+OutputTag outputTag = new OutputTag("side-output""){};
+
+SingleOutputStreamOperator result = patternStream
+.sideOutputLateData(lataDataOutputTag)
+.select(
+new PatternTimeoutFunction() {...},
+outputTag,
+new PatternSelectFunction() {...}
+);
+
+DataStream lataData = result.getSideOutput(lataDataOutputTag);
--- End diff --

typo: lateData


---


[GitHub] flink pull request #6104: [FLINK-9476]Emit late elements in CEP as sideOutPu...

2018-05-30 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6104#discussion_r191857843
  
--- Diff: docs/dev/libs/cep.md ---
@@ -1524,7 +1524,52 @@ In `CEP` the order in which elements are processed 
matters. To guarantee that el
 
 To guarantee that elements across watermarks are processed in event-time 
order, Flink's CEP library assumes
 *correctness of the watermark*, and considers as *late* elements whose 
timestamp is smaller than that of the last
-seen watermark. Late elements are not further processed.
+seen watermark. Late elements are not further processed. Also, you can 
specify a sideOutput tag to collect the late elements come after the last seen 
watermark, you can use it like this.
+
+
+
+
+{% highlight java %}
+PatternStream patternStream = CEP.pattern(input, pattern);
+
+OutputTag lataDataOutputTag = new 
OutputTag("lata-data""){};
--- End diff --

typo: "lateDataOutputTag" and "late-data" 


---


[GitHub] flink issue #5649: [FLINK-8873] [DataStream API] [Tests] move unit tests of ...

2018-05-30 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5649
  
When I was developing KeyedProcessFunction, I initially wondered why 
there's no tests for KeyedStream, and researched and realized that they were 
actually mixed with DataStream tests. 

I think having that clarity by separating those tests would be great. Well, 
I also agree it doesn't hurt that much to keep them as-is. If you feel strongly 
against it, I can close this PR




---


[GitHub] flink issue #6097: [FLINK-9470] Allow querying the key in KeyedProcessFuncti...

2018-05-29 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6097
  
+1. I might forget to add the interfaces back then, would be good to have 
them


---


[GitHub] flink pull request #6066: [FLINK-9428] [checkpointing] Allow operators to fl...

2018-05-23 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6066#discussion_r190342404
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 ---
@@ -93,6 +93,20 @@
//  state snapshots
// 

 
+   /**
--- End diff --

This description feels like it states well what it's not intended for, but 
doesn't clearly describe what it's intended for. 

It would be great if you can add what you wrote in the ticket description 
here, as of "Some operators maintain some small transient state ...   Rather 
that persisting that state in a checkpoint, it can make sense to flush the data 
downstream "


---


[GitHub] flink issue #5978: [FLINK-8554] Upgrade AWS SDK

2018-05-18 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5978
  
Are there anything in `1.11.325` we desperately need?

If not, I would oppose upgrading AWS SDK too frequently. Highly likely that 
we don't need any of the new changes in `1.11.325`. As you can see, the current 
sdk version is `1.11.319` which is upgraded just a few days ago. 

There're a few reasons we should discourage it:

- It doesn't add much value, and we don't really need it
- It costs lots of unnecessary work from both contributors and Flink 
community (committers, reviewers, etc)
- AWS releases their SDK very frequently, in a much faster pace than we can 
possibly catch up




---


[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

2018-05-18 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6021
  
@tzulitai adding docs to educate users on tuning KPL performance would be 
good. I has quite some experience on it (as you may have know :)  Ping me if 
you start working on it before I do, and I'll be glad to help contribute


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-05-18 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r189174902
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void checkQueueLimit() {
--- End diff --

probably rename it to something different, e.g  `enforceQueueLimit()`? 
because it clearly does things more than just 'check'


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-05-18 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r189175770
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void checkQueueLimit() {
+   while (producer.getOutstandingRecordsCount() >= queueLimit) {
+   producer.flush();
--- End diff --

Iooks like 
[KinesisProducer](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java)
 doesn't have a way to get child process's callback. Or maybe I misunderstood 
your proposal, Gordon? 


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-05-18 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r189176708
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void checkQueueLimit() {
+   while (producer.getOutstandingRecordsCount() >= queueLimit) {
--- End diff --

A more important thing I would count and log here is how many times it has 
already tried to flush within a single call of `enforceQueueLimit()`. We can 
set a threshold, say 10 times, and then log a message saying that KPL is 
leading to backpressure


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-05-18 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r189176320
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void checkQueueLimit() {
+   while (producer.getOutstandingRecordsCount() >= queueLimit) {
+   producer.flush();
+   try {
+   Thread.sleep(500);
+   } catch (InterruptedException e) {
+   LOG.warn("Flushing was interrupted.");
--- End diff --

you can remove this two lines, they don't provide much value. After 
removal, it will be almost exactly how `KinesisProducer#flushSync` works

```
// KinesisProducer.java
@Override
public void flushSync() {
while (getOutstandingRecordsCount() > 0) {
flush();
try {
Thread.sleep(500);
} catch (InterruptedException e) { }
}
}
```


---


[GitHub] flink issue #5649: [FLINK-8873] [DataStream API] [Tests] move unit tests of ...

2018-05-17 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5649
  
Hi @kl0u , can you pls take a look at this PR? 


---


[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

2018-05-10 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5979
  
LGTM +1


---


[GitHub] flink pull request #5979: [FLINK-9070][state]improve the performance of Rock...

2018-05-10 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5979#discussion_r187255609
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -356,6 +369,20 @@ private UV deserializeUserValue(byte[] rawValueBytes, 
TypeSerializer valueSe
return isNull ? null : valueSerializer.deserialize(in);
}
 
+   private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] 
rawKeyBytes) {
+   if (rawKeyBytes.length < keyPrefixBytes.length) {
+   return false;
+   }
+
+   for (int i = keyPrefixBytes.length; --i >= 
backend.getKeyGroupPrefixBytes(); ) {
--- End diff --

I recommend moving `--i` to the increment part of the `for` loop, instead 
of keeping it in the termination part


---


[GitHub] flink pull request #5949: [FLINK-9288][docs] clarify the event time / waterm...

2018-05-02 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5949#discussion_r185588020
  
--- Diff: docs/dev/event_time.md ---
@@ -35,30 +35,32 @@ Flink supports different notions of *time* in streaming 
programs.
 respective operation.
 
 When a streaming program runs on processing time, all time-based 
operations (like time windows) will
-use the system clock of the machines that run the respective operator. 
For example, an hourly
+use the system clock of the machines that run the respective operator. 
An hourly
 processing time window will include all records that arrived at a 
specific operator between the
-times when the system clock indicated the full hour.
+times when the system clock indicated the full hour. For example, if 
an application
+begins running at 9:15am, the first hourly processing time window will 
include events
+processed between 9:15am and 10:00am, the next window will include 
events processed between 10:00am and 11:00am, and so on.
 
 Processing time is the simplest notion of time and requires no 
coordination between streams and machines.
 It provides the best performance and the lowest latency. However, in 
distributed and asynchronous
 environments processing time does not provide determinism, because it 
is susceptible to the speed at which
-records arrive in the system (for example from the message queue), and 
to the speed at which the
-records flow between operators inside the system.
+records arrive in the system (for example from the message queue), to 
the speed at which the
+records flow between operators inside the system, and to outages 
(scheduled, or otherwise).
 
 - **Event time:** Event time is the time that each individual event 
occurred on its producing device.
-This time is typically embedded within the records before they enter 
Flink and that *event timestamp*
-can be extracted from the record. An hourly event time window will 
contain all records that carry an
-event timestamp that falls into that hour, regardless of when the 
records arrive, and in what order
-they arrive.
+This time is typically embedded within the records before they enter 
Flink, and that *event timestamp*
+can be extracted from each record. An hourly event time window will 
contain all records that carry an
--- End diff --

better mention allowed lateness here. “...will contain all records, ..., 
regardless of when the records arrive” sounds too absolute, the guarantee can 
only be achieved with lateness requirements


---


[GitHub] flink pull request #5937: [FLINK-9270] Upgrade RocksDB to 5.11.3, and resolv...

2018-04-28 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/5937


---


[GitHub] flink pull request #5937: [FLINK-9270] Upgrade RocksDB to 5.11.3, and resolv...

2018-04-28 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5937

[FLINK-9270] Upgrade RocksDB to 5.11.3, and resolve concurrent test 
invocation problem of @RetryOnFailure

## What is the purpose of the change

- Upgrade RocksDB to 5.11.3 to take latest bug fixes
- Besides, I found that unit tests annotated with `@RetryOnFailure` will be 
run concurrently if there's only `try` clause without a `catch` following. For 
example, sometimes, `RocksDBPerformanceTest.testRocksDbMergePerformance()` will 
actually be running in 3 concurrent invocations, and multiple concurrent write 
to RocksDB result in errors. 

## Brief change log

- Upgrade RocksDB to 5.11.3
- For all RocksDB performance tests, add a `catch` clause to follow `try`

This change is already covered by existing tests

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-9270

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5937.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5937


commit 9fed7b9cba78dfdc6512818d9c2c07fc80892d72
Author: Bowen Li <bowenli86@...>
Date:   2018-04-28T08:32:09Z

[FLINK-9270] Upgrade RocksDB to 5.11.3, and resolve concurrent test 
invocation problem of @RetryOnFailure




---


[GitHub] flink pull request #5913: [FLINK-9181] [docs] [sql-client] Add documentation...

2018-04-27 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5913#discussion_r184831365
  
--- Diff: docs/dev/table/sqlClient.md ---
@@ -0,0 +1,539 @@
+---
+title: "SQL Client"
+nav-parent_id: tableapi
+nav-pos: 100
+is_beta: true
+---
+
+
+
+Although Flink’s Table & SQL API allows to declare queries in the SQL 
language. A SQL query needs to be embedded within a table program that is 
written either in Java or Scala. The table program needs to be packaged with a 
build tool before it can be submitted to a cluster. This limits the usage of 
Flink to mostly Java/Scala programmers.
--- End diff --

minor: should be either `either in Java or in Scala` or `in either Java or 
Scala`


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184830451
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 ---
@@ -455,4 +455,5 @@ public StreamCompressionDecorator 
getKeyGroupCompressionDecorator() {
@VisibleForTesting
public abstract int numStateEntries();
 
+
--- End diff --

revert this?


---


[GitHub] flink pull request #5885: [FLINK-8715] Remove usage of StateDescriptor in st...

2018-04-27 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5885#discussion_r184831125
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -190,4 +197,12 @@ protected void writeKeyWithGroupAndNamespace(
RocksDBKeySerializationUtils.writeKey(key, keySerializer, 
keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
RocksDBKeySerializationUtils.writeNameSpace(namespace, 
namespaceSerializer, keySerializationStream, keySerializationDataOutputView, 
ambiguousKeyPossible);
}
+
+   protected V getDefaultValue() {
--- End diff --

this method is duplicated among some impl classes. We can move it to 
`InternalKvState` as a [default 
method](https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html).


---


[GitHub] flink issue #5887: [FLINK-6719] [docs] Add details about fault-tolerance of ...

2018-04-27 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5887
  
@fhueske updated! let me know how it looks now


---


[GitHub] flink issue #5932: [FLINK-9266][flink-connector-kinesis]Updates Kinesis conn...

2018-04-27 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5932
  
+1


---


[GitHub] flink issue #5910: [FLINK-8841] [state] Remove HashMapSerializer and use Map...

2018-04-25 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5910
  
+1


---


[GitHub] flink issue #5904: [FLINK-9249][build] Add convenience profile for skipping ...

2018-04-25 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5904
  
+1


---


[GitHub] flink pull request #5887: [FLINK-6719] Add details about fault-tolerance of ...

2018-04-21 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5887

[FLINK-6719] Add details about fault-tolerance of timers to ProcessFunction 
docs

## What is the purpose of the change

The fault-tolerance of timers is a frequently asked questions on the 
mailing lists. We should add details about the topic in the ProcessFunction 
docs.

## Brief change log

Added details about the topic in the ProcessFunction docs.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-6719

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5887.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5887


commit 324f68d6595444ae9a84ee1ccced645c51aab471
Author: Bowen Li <bowenli86@...>
Date:   2018-04-21T06:46:57Z

[FLINK-6719] Add details about fault-tolerance of timers to ProcessFunction 
docs




---


[GitHub] flink issue #5864: [FLINK-8661] Replace Collections.EMPTY_MAP with Collectio...

2018-04-18 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5864
  
+1


---


[GitHub] flink issue #5864: [FLINK-8661] Replace Collections.EMPTY_MAP with Collectio...

2018-04-18 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5864
  
does using Collections.EMPTY_MAP/EMPTY_SET lead to some warnings logging? 


---


[GitHub] flink issue #5820: [hotfix] [DataStream API] [Scala] removed unused scala im...

2018-04-12 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5820
  
@zentol which scala checkstyle mvn plugin are you looking at? I didn't find 
any available ones, and I'm afraid there might will not be one.

scalastyle explicitly said that they will not support capturing unused 
imports at https://github.com/scalastyle/scalastyle/issues/193


---


[GitHub] flink pull request #5820: [hotfix] removed unused scala imports

2018-04-05 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5820

[hotfix] removed unused scala imports

## What is the purpose of the change

removed unused scala imports

## Brief change log

removed unused scala imports

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink hotfix_2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5820.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5820


commit f6940a0bf0fd5ff487a63ffad29e4dc9cd7a970c
Author: Bowen Li <bowenli86@...>
Date:   2018-04-05T07:52:37Z

[hotfix] removed unused scala imports




---


[GitHub] flink pull request #5819: [FLINK-9140] [Build System] [scalastyle] simplify ...

2018-04-05 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5819

[FLINK-9140] [Build System] [scalastyle] simplify scalastyle configurations

## What is the purpose of the change

Simplifying `` to ``

## Brief change log

Simplifying `` to ``

## Verifying this change

This change is already covered by existing tests

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-9140

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5819.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5819


commit 8bcddf43805190665237325b1f2efd17a5b9f47f
Author: Bowen Li <bowenli86@...>
Date:   2018-04-05T07:42:50Z

[FLINK-9140] simplify scalastyle configurations




---


[GitHub] flink issue #5810: [FLINK-9127] [Core] Filesystem State Backend logged incor...

2018-04-05 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5810
  
The original logging is correct - filesystem state backend is actually 
memory state backend + filesystem checkpointing. No need to change the logging. 


---


[GitHub] flink pull request #5809: [FLINK-8697] [Kinesis Connector] Rename DummyFlink...

2018-04-03 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5809

[FLINK-8697] [Kinesis Connector] Rename DummyFlinkKafkaConsumer in 
KinesisDataFetcherTest

## What is the purpose of the change

`DummyFlinkKafkaConsumer` in `KinesisDataFetcherTest` should be named 
`DummyFlinkKinesisConsumer`

## Brief change log

Rename `DummyFlinkKafkaConsumer` to `DummyFlinkKinesisConsumer` in Kinesis 
tests

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8697

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5809.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5809


commit 6c836a62248b51c762558e87a3e80410a19262c0
Author: Bowen Li <bowenli86@...>
Date:   2018-04-03T20:53:23Z

[FLINK-8697] Rename DummyFlinkKafkaConsumer in Kinesis tests




---


[GitHub] flink issue #5800: [FLINK-8837] add @Experimental annotation and properly an...

2018-04-02 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5800
  
cc @StephanEwen @tillrohrmann @zentol 


---


[GitHub] flink issue #5760: [hotfix] [doc] update maven versions in building flink pa...

2018-04-02 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5760
  
cc @zentol 


---


[GitHub] flink pull request #5800: [FLINK-8837] add @Experimental annotation and prop...

2018-04-02 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5800

[FLINK-8837]  add @Experimental annotation and properly annotate some 
classes

## What is the purpose of the change

- add @Experimental annotation
- properly annotate some classes with @Experimental 

## Brief change log

- add @Experimental annotation
- properly annotate some classes with @Experimental 

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8837

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5800.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5800


commit ef729e91d29b47d68d38d4f08cfcc5f246d4dc34
Author: Bowen Li <bowenli86@...>
Date:   2018-04-02T07:59:34Z

[FLINK-8837]  add @Experimental annotation and properly annotate some 
classes




---


[GitHub] flink pull request #5760: [hotfix] [doc] fix maven version in building flink

2018-03-24 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5760

[hotfix] [doc] fix maven version in building flink

## What is the purpose of the change

The maven version in `start/building` is inconsistent. Make it consistent 
by changing the maven version to 3.0.4

## Brief change log

The maven version in `start/building` is inconsistent. Make it consistent 
by changing the maven version to 3.0.4

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5760.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5760


commit 8a17ccedf33bed267f4dcfeef185cc589fb70fe1
Author: Bowen Li <bli@...>
Date:   2018-03-24T15:11:16Z

[hotfix] fix maven version in building flink




---


[GitHub] flink issue #5702: [FLINK-8771] [Build System] [Checkstyle/Scalastyle] Upgra...

2018-03-23 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5702
  
Hi @zentol , can you take a look at this PR?


---


[GitHub] flink issue #5702: [FLINK-8771] [Build System] [Checkstyle/Scalastyle] Upgra...

2018-03-21 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5702
  
cc @zentol


---


[GitHub] flink pull request #5702: [FLINK-8771] Upgrade scalastyle to 1.0.0

2018-03-15 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5702

[FLINK-8771] Upgrade scalastyle to 1.0.0

## What is the purpose of the change

Upgrade scalastyle from 0.8.0 to 1.0.0

## Brief change log

- Upgrade scalastyle from 0.8.0 to 1.0.0
- Fixed some license style issues along the way, because scalalstyle 1.0.0 
detected those as errors

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8771

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5702.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5702


commit 97160a70ae64e90cc101512ab0fb4c08ba79d924
Author: Bowen Li <bowenli86@...>
Date:   2018-03-15T00:26:18Z

[FLINK-8771] Upgrade scalastyle to 1.0.0




---


[GitHub] flink pull request #5356: [FLINK-8364][state backend] Add iterator() to List...

2018-03-14 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/5356


---


[GitHub] flink issue #5649: [FLINK-8873] [DataStream API] [Tests] move unit tests of ...

2018-03-14 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5649
  
cc @kl0u 


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-13 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231064
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List<Tuple2<byte[], byte[]>> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2<byte[], byte[]> item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2<byte[], byte[]> item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
--- End diff --

replace console output with logging, you can refer to 
`RocksDBListStatePerformanceTest.java`


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-13 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174230739
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List<Tuple2<byte[], byte[]>> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2<byte[], byte[]> item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2<byte[], byte[]> item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
--- End diff --

Need to move the benchmark test to 
`org.apache.flink.contrib.streaming.state.benchmark` package.


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-13 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231142
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List<Tuple2<byte[], byte[]>> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2<byte[], byte[]> item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2<byte[], byte[]> item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+   // write batch with disableWAL=true VS write batch disableWAL = 
true
+   System.out.pri

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-13 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231173
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List<Tuple2<byte[], byte[]>> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2<byte[], byte[]> item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2<byte[], byte[]> item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+   // write batch with disableWAL=true VS write batch disableWAL = 
true
+   System.out.pri

[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

2018-03-12 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5680
  
shall we add a unit test?


---


[GitHub] flink pull request #5677: [hotfix] update doc of InternalTimerService.regist...

2018-03-09 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5677

[hotfix] update doc of InternalTimerService.registerEventTimeTimer()

## What is the purpose of the change

update doc of InternalTimerService.registerEventTimeTimer()

## Brief change log

update doc of InternalTimerService.registerEventTimeTimer()

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5677.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5677


commit 410601674a532f01268acd37c9c043b39d9ae6b1
Author: Bowen Li <bowenli86@...>
Date:   2018-03-10T07:35:15Z

[hotfix] update doc of InternalTimerService.registerEventTimeTimer()




---


[GitHub] flink issue #5663: [FLINK-8888] [Kinesis Connectors] Update the AWS SDK for ...

2018-03-09 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5663
  
Is your testing Flink job both reading from and writing to Kinesis, aka 
both KCL and KPL are tested?

If so, +1


---


[GitHub] flink issue #5356: [FLINK-8364][state backend] Add iterator() to ListState w...

2018-03-09 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5356
  
hmmm I think you are right, this actually might be a non-issue in the 
first place


---


[GitHub] flink pull request #5365: [FLINK-8515] update RocksDBMapState to replace dep...

2018-03-09 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/5365


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172935414
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
+
+   this.batch.put(handle, key, value);
+
+   if (++currentSize == capacity) {
+   flush();
+   }
+   }
+
+   public void flush() throws RocksDBException {
+   this.db.write(options, batch);
+   batch.clear();
+   currentSize = 0;
+   }
+
+   @Override
+   public void close() throws RocksDBException {
+   if (batch != null) {
--- End diff --

can batch be null?


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172935214
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
+
+   this.db = rocksDB;
+   this.options = options;
+   this.capacity = capacity;
+   this.batch = new WriteBatch(this.capacity);
+   this.currentSize = 0;
+   }
+
+   public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) 
throws RocksDBException {
--- End diff --

need synchronization on put() and flush()


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-07 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r172934683
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+   private final static int MIN_CAPACITY = 100;
+   private final static int MAX_CAPACITY = 1;
+
+   private final RocksDB db;
+
+   private final WriteBatch batch;
+
+   private final WriteOptions options;
+
+   private final int capacity;
+
+   private int currentSize;
+
+   public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+   
@Nonnull WriteOptions options,
+   int 
capacity) {
+
+   Preconditions.checkArgument(capacity >= MIN_CAPACITY && 
capacity <= MAX_CAPACITY,
+   "capacity should at least greater than 100");
--- End diff --

how is the capacity range determined - is it recommended by RocksDB?

the msg should be: "capacity should be between " + MIN + " and " + MAX


---


[GitHub] flink pull request #5649: [FLINK-8873] [DataStream API] [Tests] move unit te...

2018-03-06 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5649

[FLINK-8873] [DataStream API] [Tests] move unit tests of KeyedStream from 
DataStreamTest to KeyedStreamTest

## What is the purpose of the change

move unit tests of `KeyedStream` from `DataStreamTest` to 
`KeyedStreamTest`, in order to have a clearer separation

## Brief change log

added `KeyedStreamTest.java` and `KeyedStreamTest.scala`, and moved related 
unit tests to them

## Verifying this change

This change is already covered by existing tests, such as *KeyedStreamTest*.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8873

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5649.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5649


commit d4e372fbb21edc0507df461aa7f47a9168350a1a
Author: Bowen Li <bowenli86@...>
Date:   2018-03-05T19:52:37Z

[FLINK-8873] move unit tests of KeyedStream from DataStreamTest to 
KeyedStreamTest




---


[GitHub] flink issue #5501: [FLINK-6053][metrics] Add new Number-/StringGauge metric ...

2018-03-06 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5501
  
LGTM, +1 on merging to 1.6.0


---


[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

2018-03-05 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
@kl0u @aljoscha   I added the scala example, and I believe the only build 
failure in Travis is irrelevant


---


[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...

2018-03-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172306197
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * As soon as elements are joined they are passed to a user-defined 
{@link TimeBoundedJoinFunction},
+ * as a {@link Tuple2}, with f0 being the left element and f1 being the 
right element
+ *
+ * The basic idea of this implementation is as follows: Whenever we 
receive an element at
+ * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add 
it to the left buffer.
+ * We then check the right buffer to see whether there are any elements 
that can be joined. If
+ * there are, they are joined and passed to a user-defined {@link 
TimeBoundedJoinFunction}.
+ * The same happens the other way around when receiving an element on the 
right side.
+ *
+ * In some cases the watermark needs to be delayed. This for example 
can happen if
+ * if t2.ts ∈ [t1.ts + 1, t1.ts + 2] and elements from t1 arrive earlier 
than elements from t2 and
+ * therefore get added to the left buffer. When an element now arrives on 
the right side, the
+ * watermark might have already progressed. The right element now gets 
joined with an
+ * older element from the left side, where the timestamp of the left 
element is lower than the
+ * current watermark, which would make this element late. This can be 
avoided by holding back the
+ * watermarks.
+ *
+ * The left and right buffers are cleared from unused values 
periodically
+ * (triggered by watermarks) in order not to grow infinitely.
+ *
+ *
+ * @param  The type of the elements in the left stream
+ * @param  The type of the elements in the right stream
+ * @param  The output type created by the u

[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...

2018-03-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172303424
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY> 
keySelector)  {
public  WithWindow<T1, T2, KEY, W> 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded<T1, T2, KEY> between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded<IN1, IN2, KEY> {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
+
+   private final DataStream left;
+   private final DataStream right;
+
+   private final long lowerBound;
+   private final long upperBound;
+
+   private final KeySelector<IN1, KEY> keySelector1;
+   private final KeySelector<IN2, KEY> keySelector2;
+
+   private boolean lowerBoundInclusive;
+   private boolean upperBoundInclusive;
+
+   public TimeBounded(
+   DataStream left,
+   DataStream right,
+   long lowerBound,
+   long upperBound,
+   boolean lowerBoundInclusive,
+   boolean upperBoundInclusive,
+   KeySelector<IN1, KEY> keySelector1,
+   KeySelector<IN2, KEY> keySelector2) {
+
+   this.left = Preconditions.checkNotNull(left);
+   this.right = Preconditions.checkNotNull(right);
+
+   this.lowerBound = lowerBound;
+   this.upperBound = upperBound;
+
+   this.lowerBoundInclusive = lowerBoundInclusive;
+   this.upperBoundInclusive = upperBoundInclusive;
+
+   this.keySelector1 = 
Preconditions.checkNotNull(keySelector1);
+   this.keySelector2 = 
Preconditions.checkNotNull(keySelector2);
+   }
+
+   /**
+* Configure whether the upper bound shoul

[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...

2018-03-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172302147
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY> 
keySelector)  {
public  WithWindow<T1, T2, KEY, W> 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded<T1, T2, KEY> between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
--- End diff --

should use `IllegalStateException`. or even better, shall we create a Flink 
specific exception?


---


[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...

2018-03-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172302583
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 ---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY> 
keySelector)  {
public  WithWindow<T1, T2, KEY, W> 
window(WindowAssigner, W> assigner) {
return new WithWindow<>(input1, input2, 
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+   /**
+* Specifies the time boundaries over which the join 
operation works, so that
+* leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound
+* By default both the lower and the upper bound are 
inclusive. This can be configured
+* with {@link 
TimeBounded#lowerBoundExclusive(boolean)} and
+* {@link TimeBounded#upperBoundExclusive(boolean)}
+*
+* @param lowerBound The lower bound. Needs to be 
smaller than or equal to the upperBound
+* @param upperBound The upper bound. Needs to be 
bigger than or equal to the lowerBound
+*/
+   public TimeBounded<T1, T2, KEY> between(Time 
lowerBound, Time upperBound) {
+
+   TimeCharacteristic timeCharacteristic =
+   
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+   if (timeCharacteristic != 
TimeCharacteristic.EventTime) {
+   throw new 
RuntimeException("Time-bounded stream joins are only supported in event time");
+   }
+
+   checkNotNull(lowerBound, "A lower bound needs 
to be provided for a time-bounded join");
+   checkNotNull(upperBound, "An upper bound needs 
to be provided for a time-bounded join");
+   return new TimeBounded<>(
+   input1,
+   input2,
+   lowerBound.toMilliseconds(),
+   upperBound.toMilliseconds(),
+   true,
+   true,
+   keySelector1,
+   keySelector2
+   );
+   }
+   }
+   }
+
+   /**
+* Joined streams that have keys for both sides as well as the time 
boundaries over which
+* elements should be joined defined.
+*
+* @param  Input type of elements from the first stream
+* @param  Input type of elements from the second stream
+* @param  The type of the key
+*/
+   public static class TimeBounded<IN1, IN2, KEY> {
+
+   private static final String TIMEBOUNDED_JOIN_FUNC_NAME = 
"TimeBoundedJoin";
--- End diff --

hmm... this might be not very relevant, but I'd prefer a single config 
class that holds all function's names, rather than having them scattered all 
over the code base. 


---


[GitHub] flink pull request #5482: [FLINK-8480][DataStream] Add Java API for timeboun...

2018-03-05 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5482#discussion_r172303671
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimeBoundedStreamJoinOperator.java
 ---
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A TwoInputStreamOperator to execute time-bounded stream inner joins.
+ *
+ * By using a configurable lower and upper bound this operator will 
emit exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both 
the lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
--- End diff --

bound**s**


---


[GitHub] flink issue #5616: [FLINK-8828] [stream, dataset, scala] Introduce collect m...

2018-03-02 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5616
  
need to add to java API as well


---


[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

2018-03-01 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
@kl0u I added the comments for `@deprecated` in the javadoc. Let me know if 
you can merge the two related PRs. Thanks


---


[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...

2018-02-28 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
@kl0u @aljoscha I've updated this PR, and its build is green


---


[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...

2018-02-27 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
@tillrohrmann  @kl0u Thanks for reviewing, guys 

As @pnowojski mentioned, we three decided to expose timer keys in 
`ProcessFunction` in [FLINK-8560](https://github.com/apache/flink/pull/5481).  
Exposing timer keys in `KeyedBroadcastProcessFunction` extends that design. I 
think we should get this PR into 1.5.0 so we don't need to do the [complicated 
refactoring for FLINK-8560](https://github.com/apache/flink/pull/5481) to 
support backward compatibility 


---


[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...

2018-02-22 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
Hi @aljoscha , can you take a look? 


---


[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

2018-02-21 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
Thanks for the review and suggestions. And your comment on 
`DataStream#process(KeyedProcessFunction)` makes sense, I've removed it. 

(btw, I feel https://github.com/apache/flink/pull/5500 is more urgent that 
this PR. Can you take it look at that one?)


---


[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

2018-02-20 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
cc @pnowojski  @aljoscha 


---


[GitHub] flink issue #5522: [FLINK-8710] [YARN] AbstractYarnClusterDescriptor doesn't...

2018-02-20 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5522
  
cc @tillrohrmann 


---


[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...

2018-02-20 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
cc @pnowojski @aljoscha 


---


[GitHub] flink pull request #5537: [FLINK-8719] add module description for flink-cont...

2018-02-20 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5537

[FLINK-8719] add module description for flink-contrib to clarify its purpose

## What is the purpose of the change

flink-contrib currently doesn't have any clarification or description of 
its purpose, which confuses lots of developers. Adding clarification and module 
description

## Brief change log

Adding clarification and module description which I borrowed from the PR 
description of https://github.com/apache/flink/pull/5523

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8719

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5537.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5537


commit aecfcdd6e776f2b885f4cb5288bcb1b27d4b23cd
Author: Bowen Li <bowenli86@...>
Date:   2018-02-20T19:04:43Z

[FLINK-8719] add module description for flink-contrib to clarify its purpose




---


[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...

2018-02-19 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
a irrelavent flaky unit test caused the error in travis build, and I've 
filed [FLINK-8709](https://issues.apache.org/jira/browse/FLINK-8709) for it


---


[GitHub] flink issue #5522: [hotfix] hotfix for AbstractYarnClusterDescriptor

2018-02-19 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5522
  
the failure in travis build is irrelevant


---


[GitHub] flink pull request #5522: [hotfix] [javadoc] fix wrong javadoc in AbstractYa...

2018-02-19 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5522

[hotfix] [javadoc] fix wrong javadoc in AbstractYarnClusterDescriptor

## What is the purpose of the change

hotfix of javadoc in AbstractYarnClusterDescriptor

## Brief change log

hotfix of javadoc in AbstractYarnClusterDescriptor

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5522.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5522


commit ae112511cf41931e8010b5e0607714ce02f5cc2f
Author: Bowen Li <bowenli86@...>
Date:   2018-02-19T08:04:54Z

fix wrong javadoc




---


[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...

2018-02-17 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
@pnowojski @aljoscha  I updated the code. Hopefully we can make this into 
1.5.0! Thanks!


---


[GitHub] flink issue #5501: [FLINK-6053][metrics] Add new Number-/StringGauge metric ...

2018-02-16 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5501
  
LGTM generally.

I still feel having all the `instanceof` in `notifyOfAddedMetric` and 
`notifyOfRemovedMetric` is a bit inelegant. I'm fine with it since there'll 
(hopefully) be only a limited number of metric types, so the `instanceof` 
clauses won't grow insanely.




---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-16 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168891314
  
--- Diff: docs/ops/state/large_state_tuning.md ---
@@ -234,4 +234,97 @@ Compression can be activated through the 
`ExecutionConfig`:
 **Notice:** The compression option has no impact on incremental snapshots, 
because they are using RocksDB's internal
 format which is always using snappy compression out of the box.
 
+## Task-Local Recovery
+
+### Motivation
+
+In Flink's checkpointing, each task produces a snapshot of its state that 
is then written to a distributed store. Each task acknowledges
+a successful write of the state to the job manager by sending a handle 
that describes the location of the state in the distributed store.
+The job manager, in turn, collects the handles from all tasks and bundles 
them into a checkpoint object.
+
+In case of recovery, the job manager opens the latest checkpoint object 
and sends the handles back to the corresponding tasks, which can
+then restore their state from the distributed storage. Using a distributed 
storage to store state has two important advantages. First, the storage
+is fault tolerant and second, all state in the distributed store is 
accessible to all nodes and can be easily redistributed (e.g. for rescaling).
+
+However, using a remote distributed store has also one big disadvantage: 
all tasks must read their state from a remote location, over the network.
+In many scenarios, recovery could reschedule failed tasks to the same task 
manager as in the previous run (of course there are exceptions like machine
+failures), but we still have to read remote state. This can result in 
*long recovery times for large states*, even if there was only a small failure 
on
+a single machine.
+
+### Approach
+
+Task-local state recovery targets exactly this problem of long recovery 
times and the main idea is the following: for every checkpoint, we do not
+only write task states to the distributed storage, but also keep *a 
secondary copy of the state snapshot in a storage that is local to the task*
+(e.g. on local disk or in memory). Notice that the primary store for 
snapshots must still be the distributed store, because local storage does not
+ensure durability under node failures abd also does not provide access for 
other nodes to redistribute state, this functionality still requires the
+primary copy.
+
+However, for each task that can be rescheduled to the previous location 
for recovery, we can restore state from the secondary, local
+copy and avoid the costs of reading the state remotely. Given that *many 
failures are not node failures and node failures typically only affect one
+or very few nodes at a time*, it is very likely that in a recovery most 
tasks can return to their previous location and find their local state intact.
+This is what makes local recovery effective in reducing recovery time.
+
+Please note that this can come at some additional costs per checkpoint for 
creating and storing the secondary local state copy, depending on the
+chosen state backend and checkpointing strategy. For example, in most 
cases the implementation will simply duplicate the writes to the distributed
+store to a local file.
+
+
+
+### Relationship of primary (distributed store) and secondary (task-local) 
state snapshots
+
+Task-local state is always considered a secondary copy, the ground truth 
of the checkpoint state is the primary copy in the distributed store. This
+has implications for problems with local state during checkpointing and 
recovery:
+
+- For checkpointing, the *primary copy must be successful* and a failure 
to produce the *secondary, local copy will not fail* the checkpoint. A 
checkpoint
+will fail if the primary copy could not be created, even if the secondary 
copy was successfully created.
+
+- Only the primary copy is acknowledged and managed by the job manager, 
secondary copies are owned by task managers and their life cycle can be
+independent from their primary copy. For example, it is possible to retain 
a history of the 3 latest checkpoints as primary copies and only keep
+the task-local state of the latest checkpoint.
+
+- For recovery, Flink will always *attempt to restore from task-local 
state first*, if a matching secondary copy is available. If any problem occurs 
during
+the recovery from the secondary copy, Flink will *transparently retry to 
recovery the task from the primary copy*. Recovery only fails, if primary
+and the (optional) secondary copy failed. In this case, depending on the 
configuration Flink could still fall back to an older checkpoint.
+
+- It is possible that the task-local copy contains only parts of the full 
task state (e.g. exception

[GitHub] flink pull request #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFu...

2018-02-16 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5500#discussion_r168855562
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
 ---
@@ -324,6 +324,11 @@ public TimeDomain timeDomain() {
return timeDomain;
}
 
+   @Override
+   public KS getCurrentKey() {
--- End diff --

added


---


[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...

2018-02-15 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
cc @aljoscha  @pnowojski 


---


  1   2   3   4   5   >