[jira] [Commented] (FLINK-10123) Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577019#comment-16577019
 ] 

ASF GitHub Bot commented on FLINK-10123:


yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory 
instead of DefaultThreadFactory in RestServer/Client
URL: https://github.com/apache/flink/pull/6539#issuecomment-412244963
 
 
   Till, scala checkstyle error : 
   
   ```
   error 
file=/home/travis/build/apache/flink/flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala
 message=File line length exceeds 100 characters line=88
   ```
   
   other travis build task also failed, because of some connection timeout.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
> --
>
> Key: FLINK-10123
> URL: https://issues.apache.org/jira/browse/FLINK-10123
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Instead of using the {{DefaultThreadFactory}} in the 
> {{RestServerEndpoint}}/{{RestClient}} we should use the 
> {{ExecutorThreadFactory}} because it uses the {{FatalExitExceptionHandler}} 
> per default as the uncaught exception handler. This should guard against 
> uncaught exceptions by simply terminating the JVM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client

2018-08-10 Thread GitBox
yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory 
instead of DefaultThreadFactory in RestServer/Client
URL: https://github.com/apache/flink/pull/6539#issuecomment-412244963
 
 
   Till, scala checkstyle error : 
   
   ```
   error 
file=/home/travis/build/apache/flink/flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala
 message=File line length exceeds 100 characters line=88
   ```
   
   other travis build task also failed, because of some connection timeout.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10125) Unclosed ByteArrayDataOutputView in RocksDBMapState

2018-08-10 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10125:


Assignee: vinoyang

> Unclosed ByteArrayDataOutputView in RocksDBMapState
> ---
>
> Key: FLINK-10125
> URL: https://issues.apache.org/jira/browse/FLINK-10125
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
>   ByteArrayDataOutputView dov = new ByteArrayDataOutputView(1);
> {code}
> dov is used in a try block but it is not closed in case of Exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577011#comment-16577011
 ] 

ASF GitHub Bot commented on FLINK-5315:
---

hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r209413346
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ##
 @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends 
UserDefinedFunction {
 * accumulator type should be automatically inferred.
 */
   def getAccumulatorType: TypeInformation[ACC] = null
+
+  private[flink] var isDistinctAgg: Boolean = false
+
+  private[flink] def distinct: AggregateFunction[T, ACC] = {
 
 Review comment:
   good catch!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support distinct aggregations in table api
> --
>
> Key: FLINK-5315
> URL: https://issues.apache.org/jira/browse/FLINK-5315
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Kurt Young
>Assignee: Rong Rong
>Priority: Major
>  Labels: pull-request-available
>
> Support distinct aggregations in Table API in the following format:
> For Expressions:
> {code:scala}
> 'a.count.distinct // Expressions distinct modifier
> {code}
> For User-defined Function:
> {code:scala}
> singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier
> multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-10 Thread GitBox
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r209413346
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ##
 @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends 
UserDefinedFunction {
 * accumulator type should be automatically inferred.
 */
   def getAccumulatorType: TypeInformation[ACC] = null
+
+  private[flink] var isDistinctAgg: Boolean = false
+
+  private[flink] def distinct: AggregateFunction[T, ACC] = {
 
 Review comment:
   good catch!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10125) Unclosed ByteArrayDataOutputView in RocksDBMapState

2018-08-10 Thread Ted Yu (JIRA)
Ted Yu created FLINK-10125:
--

 Summary: Unclosed ByteArrayDataOutputView in RocksDBMapState
 Key: FLINK-10125
 URL: https://issues.apache.org/jira/browse/FLINK-10125
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


{code}
  ByteArrayDataOutputView dov = new ByteArrayDataOutputView(1);
{code}
dov is used in a try block but it is not closed in case of Exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-10041.
--
   Resolution: Implemented
Fix Version/s: 1.7.0

Merged in:
master: 9d67afbb84

> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10124) Use ByteArrayDataInput/OutputView instead of stream + wrapper

2018-08-10 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-10124.
--
   Resolution: Implemented
Fix Version/s: 1.7.0

Merged in:
master: 18ff4ab

> Use ByteArrayDataInput/OutputView instead of stream + wrapper
> -
>
> Key: FLINK-10124
> URL: https://issues.apache.org/jira/browse/FLINK-10124
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576664#comment-16576664
 ] 

ASF GitHub Bot commented on FLINK-10041:


asfgit closed pull request #6501: [FLINK-10041] Extract iterators from 
RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
index 33836f0c781..698a9f97dc0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
@@ -53,4 +53,8 @@ public void setPosition(int pos) {
public void setData(@Nonnull byte[] buffer, int offset, int length) {
inStreamWithPos.setBuffer(buffer, offset, length);
}
+
+   public void setData(@Nonnull byte[] buffer) {
+   setData(buffer, 0, buffer.length);
+   }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
index 32819f84e46..2a9ab7589a9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
@@ -20,8 +20,6 @@
 
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.internal.InternalAppendingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -63,7 +61,8 @@ SV getInternal(byte[] key) {
if (valueBytes == null) {
return null;
}
-   return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+   dataInputView.setData(valueBytes);
+   return valueSerializer.deserialize(dataInputView);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving 
data from RocksDB", e);
}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 7483089106f..65b7f1fa4a7 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -20,9 +20,8 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.internal.InternalKvState;
@@ -67,9 +66,9 @@
 
protected final WriteOptions writeOptions;
 
-   protected final ByteArrayOutputStreamWithPos keySerializationStream;
+   protected final ByteArrayDataOutputView dataOutputView;
 
-   protected final DataOutputView keySerializationDataOutputView;
+   protected final ByteArrayDataInputView dataInputView;
 
private final boolean ambiguousKeyPossible;
 
@@ -98,9 +97,10 @@ protected AbstractRocksDBState(
this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer, "State value serializer");
this.defaultValue = defaultValue;
 
-   this.keySerializationStream = new 

[GitHub] asfgit closed pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …

2018-08-10 Thread GitBox
asfgit closed pull request #6501: [FLINK-10041] Extract iterators from 
RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
index 33836f0c781..698a9f97dc0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
@@ -53,4 +53,8 @@ public void setPosition(int pos) {
public void setData(@Nonnull byte[] buffer, int offset, int length) {
inStreamWithPos.setBuffer(buffer, offset, length);
}
+
+   public void setData(@Nonnull byte[] buffer) {
+   setData(buffer, 0, buffer.length);
+   }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
index 32819f84e46..2a9ab7589a9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
@@ -20,8 +20,6 @@
 
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.internal.InternalAppendingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -63,7 +61,8 @@ SV getInternal(byte[] key) {
if (valueBytes == null) {
return null;
}
-   return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+   dataInputView.setData(valueBytes);
+   return valueSerializer.deserialize(dataInputView);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving 
data from RocksDB", e);
}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 7483089106f..65b7f1fa4a7 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -20,9 +20,8 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.internal.InternalKvState;
@@ -67,9 +66,9 @@
 
protected final WriteOptions writeOptions;
 
-   protected final ByteArrayOutputStreamWithPos keySerializationStream;
+   protected final ByteArrayDataOutputView dataOutputView;
 
-   protected final DataOutputView keySerializationDataOutputView;
+   protected final ByteArrayDataInputView dataInputView;
 
private final boolean ambiguousKeyPossible;
 
@@ -98,9 +97,10 @@ protected AbstractRocksDBState(
this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer, "State value serializer");
this.defaultValue = defaultValue;
 
-   this.keySerializationStream = new 
ByteArrayOutputStreamWithPos(128);
-   this.keySerializationDataOutputView = new 
DataOutputViewStreamWrapper(keySerializationStream);
-   this.ambiguousKeyPossible = 

[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576660#comment-16576660
 ] 

ASF GitHub Bot commented on FLINK-10041:


StefanRRichter commented on issue #6501: [FLINK-10041] Extract iterators from 
RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#issuecomment-412162109
 
 
   @bowenli86 @azagrebin thanks for the reviews. Merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on issue #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …

2018-08-10 Thread GitBox
StefanRRichter commented on issue #6501: [FLINK-10041] Extract iterators from 
RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#issuecomment-412162109
 
 
   @bowenli86 @azagrebin thanks for the reviews. Merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10124) Use ByteArrayDataInput/OutputView instead of stream + wrapper

2018-08-10 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-10124:
--

 Summary: Use ByteArrayDataInput/OutputView instead of stream + 
wrapper
 Key: FLINK-10124
 URL: https://issues.apache.org/jira/browse/FLINK-10124
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Stefan Richter
Assignee: Stefan Richter






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576601#comment-16576601
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on issue #6501: [FLINK-10041] Extract iterators from 
RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#issuecomment-412147803
 
 
    LGTM


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on issue #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …

2018-08-10 Thread GitBox
azagrebin commented on issue #6501: [FLINK-10041] Extract iterators from 
RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#issuecomment-412147803
 
 
    LGTM


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures

2018-08-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576588#comment-16576588
 ] 

vinoyang commented on FLINK-10074:
--

[~trohrm...@apache.org] need more discussion? if no, I will start this work.

> Allowable number of checkpoint failures 
> 
>
> Key: FLINK-10074
> URL: https://issues.apache.org/jira/browse/FLINK-10074
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Thomas Weise
>Assignee: vinoyang
>Priority: Major
>
> For intermittent checkpoint failures it is desirable to have a mechanism to 
> avoid restarts. If, for example, a transient S3 error prevents checkpoint 
> completion, the next checkpoint may very well succeed. The user may wish to 
> not incur the expense of restart under such scenario and this could be 
> expressed with a failure threshold (number of subsequent checkpoint 
> failures), possibly combined with a list of exceptions to tolerate.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10048) Migrate module flink-gelly-examples to flink-examples and rename it

2018-08-10 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576578#comment-16576578
 ] 

vinoyang commented on FLINK-10048:
--

[~trohrm...@apache.org] any opinion? if not I will start this work.

> Migrate module flink-gelly-examples to flink-examples and rename it
> ---
>
> Key: FLINK-10048
> URL: https://issues.apache.org/jira/browse/FLINK-10048
> Project: Flink
>  Issue Type: Improvement
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> I think we can put all the example modules into flink-examples module.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576573#comment-16576573
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412144209
 
 
   Hi @pnowojski thanks for your suggestion, I have refactored this PR, we both 
changed the `PrintSinkFunctionTest` so maybe I did not finish all of the works. 
What's more, I agree with the opinion about "deduplicate the logic of 
PrintSinkFunction and PrintingOutputFormat",  I'd like to start this work, if 
current issue would be fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method 
to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#issuecomment-412144209
 
 
   Hi @pnowojski thanks for your suggestion, I have refactored this PR, we both 
changed the `PrintSinkFunctionTest` so maybe I did not finish all of the works. 
What's more, I agree with the opinion about "deduplicate the logic of 
PrintSinkFunction and PrintingOutputFormat",  I'd like to start this work, if 
current issue would be fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-5232) Add a Thread default uncaught exception handler on the JobManager

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576557#comment-16576557
 ] 

ASF GitHub Bot commented on FLINK-5232:
---

yanghua commented on issue #6334: [FLINK-5232] Add a Thread default uncaught 
exception handler on the JobManager
URL: https://github.com/apache/flink/pull/6334#issuecomment-412140577
 
 
   so, I close this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a Thread default uncaught exception handler on the JobManager
> -
>
> Key: FLINK-5232
> URL: https://issues.apache.org/jira/browse/FLINK-5232
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> When some JobManager threads die because of uncaught exceptions, we should 
> bring down the JobManager. If a thread dies from an uncaught exception, there 
> is a high chance that the JobManager becomes dysfunctional.
> The only sfae thing is to rely on the JobManager being restarted by YARN / 
> Mesos / Kubernetes / etc.
> I suggest to add this code to the JobManager launch:
> {code}
> Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
> @Override
> public void uncaughtException(Thread t, Throwable e) {
> try {
> LOG.error("Thread {} died due to an uncaught exception. Killing 
> process.", t.getName());
> } finally {
> Runtime.getRuntime().halt(-1);
> }
> }
> });
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5232) Add a Thread default uncaught exception handler on the JobManager

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576556#comment-16576556
 ] 

ASF GitHub Bot commented on FLINK-5232:
---

yanghua commented on issue #6334: [FLINK-5232] Add a Thread default uncaught 
exception handler on the JobManager
URL: https://github.com/apache/flink/pull/6334#issuecomment-412140446
 
 
   @tillrohrmann good job, thanks for helping me to extend it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a Thread default uncaught exception handler on the JobManager
> -
>
> Key: FLINK-5232
> URL: https://issues.apache.org/jira/browse/FLINK-5232
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> When some JobManager threads die because of uncaught exceptions, we should 
> bring down the JobManager. If a thread dies from an uncaught exception, there 
> is a high chance that the JobManager becomes dysfunctional.
> The only sfae thing is to rely on the JobManager being restarted by YARN / 
> Mesos / Kubernetes / etc.
> I suggest to add this code to the JobManager launch:
> {code}
> Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
> @Override
> public void uncaughtException(Thread t, Throwable e) {
> try {
> LOG.error("Thread {} died due to an uncaught exception. Killing 
> process.", t.getName());
> } finally {
> Runtime.getRuntime().halt(-1);
> }
> }
> });
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager

2018-08-10 Thread GitBox
yanghua commented on issue #6334: [FLINK-5232] Add a Thread default uncaught 
exception handler on the JobManager
URL: https://github.com/apache/flink/pull/6334#issuecomment-412140577
 
 
   so, I close this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager

2018-08-10 Thread GitBox
yanghua commented on issue #6334: [FLINK-5232] Add a Thread default uncaught 
exception handler on the JobManager
URL: https://github.com/apache/flink/pull/6334#issuecomment-412140446
 
 
   @tillrohrmann good job, thanks for helping me to extend it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10123) Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576554#comment-16576554
 ] 

ASF GitHub Bot commented on FLINK-10123:


yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory 
instead of DefaultThreadFactory in RestServer/Client
URL: https://github.com/apache/flink/pull/6539#issuecomment-412139875
 
 
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
> --
>
> Key: FLINK-10123
> URL: https://issues.apache.org/jira/browse/FLINK-10123
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Instead of using the {{DefaultThreadFactory}} in the 
> {{RestServerEndpoint}}/{{RestClient}} we should use the 
> {{ExecutorThreadFactory}} because it uses the {{FatalExitExceptionHandler}} 
> per default as the uncaught exception handler. This should guard against 
> uncaught exceptions by simply terminating the JVM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client

2018-08-10 Thread GitBox
yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory 
instead of DefaultThreadFactory in RestServer/Client
URL: https://github.com/apache/flink/pull/6539#issuecomment-412139875
 
 
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576527#comment-16576527
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add 
Queryable table sink.
URL: https://github.com/apache/flink/pull/6434#discussion_r209314950
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sink/queryable/QueryableTableSinkTest.scala
 ##
 @@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sink.queryable
+
+import java.time.Duration
+import java.util.concurrent.{ExecutionException, TimeUnit}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.{Deadline, Time}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
+import org.apache.flink.queryablestate.client.QueryableStateClient
+import 
org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException
+import org.apache.flink.runtime.state.StateBackend
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
+import org.apache.flink.table.sinks.queryable.QueryableTableSink
+import org.apache.flink.types.Row
+import org.hamcrest.core.Is
+import org.junit.Assert._
+import org.junit.rules.{ExpectedException, TemporaryFolder}
+import org.junit.{Rule, Test}
+
+
+class QueryableTableSinkTest extends QueryableSinkTestBase {
+
+  private val queryConfig = new StreamQueryConfig()
+  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+
+  val _tempFolder = new TemporaryFolder
+  @Rule
+  def tempFolder: TemporaryFolder = _tempFolder
+
+  val _expectedException = ExpectedException.none()
+  @Rule
+  def expectedException: ExpectedException = _expectedException
+
+  def getStateBackend: StateBackend = {
+val dbPath = tempFolder.newFolder().getAbsolutePath
+val checkpointPath = tempFolder.newFolder().toURI.toString
+val backend = new RocksDBStateBackend(checkpointPath)
+backend.setDbStoragePath(dbPath)
+backend
+  }
+
+  @Test
+  def testQueryableSink(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+//name, money
+val data = List(("jeff", -1), ("dean", -2), ("jeff", 2), ("dean", 4))
+val source = new TestKVListSource[String, Int](data)
+
+// select name, sum(money) as sm from t group by name
+val t = env.addSource(source).toTable(tEnv, 'name, 'money)
+.groupBy("name")
+.select("name, sum(money) as sm")
+
+val queryableSink = new QueryableTableSink("prefix",
+  new StreamQueryConfig().withIdleStateRetentionTime(Time.minutes(1), 
Time.minutes(7)))
+
+t.writeToSink(queryableSink)
+
+val clusterClient = 
QueryableSinkTestBase.miniClusterResource.getClusterClient
+val deadline = Deadline.now.plus(Duration.ofSeconds(100))
+
+val autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, 
env.getJavaEnv)
+val client = new QueryableStateClient("localhost", 9084)
+
 
 Review comment:
   Required shutdown client in the end


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> 

[GitHub] xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink.

2018-08-10 Thread GitBox
xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add 
Queryable table sink.
URL: https://github.com/apache/flink/pull/6434#discussion_r209314950
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sink/queryable/QueryableTableSinkTest.scala
 ##
 @@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sink.queryable
+
+import java.time.Duration
+import java.util.concurrent.{ExecutionException, TimeUnit}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.{Deadline, Time}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
+import org.apache.flink.queryablestate.client.QueryableStateClient
+import 
org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException
+import org.apache.flink.runtime.state.StateBackend
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import 
org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
+import org.apache.flink.table.sinks.queryable.QueryableTableSink
+import org.apache.flink.types.Row
+import org.hamcrest.core.Is
+import org.junit.Assert._
+import org.junit.rules.{ExpectedException, TemporaryFolder}
+import org.junit.{Rule, Test}
+
+
+class QueryableTableSinkTest extends QueryableSinkTestBase {
+
+  private val queryConfig = new StreamQueryConfig()
+  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+
+  val _tempFolder = new TemporaryFolder
+  @Rule
+  def tempFolder: TemporaryFolder = _tempFolder
+
+  val _expectedException = ExpectedException.none()
+  @Rule
+  def expectedException: ExpectedException = _expectedException
+
+  def getStateBackend: StateBackend = {
+val dbPath = tempFolder.newFolder().getAbsolutePath
+val checkpointPath = tempFolder.newFolder().toURI.toString
+val backend = new RocksDBStateBackend(checkpointPath)
+backend.setDbStoragePath(dbPath)
+backend
+  }
+
+  @Test
+  def testQueryableSink(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+//name, money
+val data = List(("jeff", -1), ("dean", -2), ("jeff", 2), ("dean", 4))
+val source = new TestKVListSource[String, Int](data)
+
+// select name, sum(money) as sm from t group by name
+val t = env.addSource(source).toTable(tEnv, 'name, 'money)
+.groupBy("name")
+.select("name, sum(money) as sm")
+
+val queryableSink = new QueryableTableSink("prefix",
+  new StreamQueryConfig().withIdleStateRetentionTime(Time.minutes(1), 
Time.minutes(7)))
+
+t.writeToSink(queryableSink)
+
+val clusterClient = 
QueryableSinkTestBase.miniClusterResource.getClusterClient
+val deadline = Deadline.now.plus(Duration.ofSeconds(100))
+
+val autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, 
env.getJavaEnv)
+val client = new QueryableStateClient("localhost", 9084)
+
 
 Review comment:
   Required shutdown client in the end


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576517#comment-16576517
 ] 

ASF GitHub Bot commented on FLINK-5315:
---

walterddr commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r209313630
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ##
 @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends 
UserDefinedFunction {
 * accumulator type should be automatically inferred.
 */
   def getAccumulatorType: TypeInformation[ACC] = null
+
+  private[flink] var isDistinctAgg: Boolean = false
+
+  private[flink] def distinct: AggregateFunction[T, ACC] = {
 
 Review comment:
   This is not going to work as it modifies an underlying field of this 
particular AggregateFunction object. 
   For example: 
   ```
   table.select(udagg.distinct('a), udagg('a))
   ```
   will return the same result in both column because distinct modifier has 
been added to this particular `udagg` element. This is a blunder on my end and 
I should fixed this before further reviews can be conducted. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support distinct aggregations in table api
> --
>
> Key: FLINK-5315
> URL: https://issues.apache.org/jira/browse/FLINK-5315
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Kurt Young
>Assignee: Rong Rong
>Priority: Major
>  Labels: pull-request-available
>
> Support distinct aggregations in Table API in the following format:
> For Expressions:
> {code:scala}
> 'a.count.distinct // Expressions distinct modifier
> {code}
> For User-defined Function:
> {code:scala}
> singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier
> multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-10 Thread GitBox
walterddr commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r209313630
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 ##
 @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends 
UserDefinedFunction {
 * accumulator type should be automatically inferred.
 */
   def getAccumulatorType: TypeInformation[ACC] = null
+
+  private[flink] var isDistinctAgg: Boolean = false
+
+  private[flink] def distinct: AggregateFunction[T, ACC] = {
 
 Review comment:
   This is not going to work as it modifies an underlying field of this 
particular AggregateFunction object. 
   For example: 
   ```
   table.select(udagg.distinct('a), udagg('a))
   ```
   will return the same result in both column because distinct modifier has 
been added to this particular `udagg` element. This is a blunder on my end and 
I should fixed this before further reviews can be conducted. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10001) Improve Kubernetes documentation

2018-08-10 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-10001:
-

Assignee: Till Rohrmann

> Improve Kubernetes documentation
> 
>
> Key: FLINK-10001
> URL: https://issues.apache.org/jira/browse/FLINK-10001
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Kubernetes
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.6.0
>
>
> We should update Flink's K8s documentation. This includes running it on 
> {{MiniKube}} as well as on a K8s cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576452#comment-16576452
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add 
Queryable table sink.
URL: https://github.com/apache/flink/pull/6434#discussion_r209300017
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/queryable/QueryableTableSink.scala
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks.queryable
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.sinks.{TableSinkBase, UpsertStreamTableSink}
+import org.apache.flink.types.Row
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access table 
data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point query 
against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  */
+class QueryableTableSink(
+  private val namePrefix: String,
+  private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be used 
with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): 
Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
+  }
+
+  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
+new QueryableTableSink(this.namePrefix, this.queryConfig)
+  }
+}
+
+
+
+
 
 Review comment:
   remove block of blank lines


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Store streaming, updating tables with unique key in queryable state
> 

[GitHub] xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink.

2018-08-10 Thread GitBox
xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add 
Queryable table sink.
URL: https://github.com/apache/flink/pull/6434#discussion_r209300017
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/queryable/QueryableTableSink.scala
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks.queryable
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.sinks.{TableSinkBase, UpsertStreamTableSink}
+import org.apache.flink.types.Row
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access table 
data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point query 
against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  */
+class QueryableTableSink(
+  private val namePrefix: String,
+  private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be used 
with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): 
Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
+  }
+
+  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
+new QueryableTableSink(this.namePrefix, this.queryConfig)
+  }
+}
+
+
+
+
 
 Review comment:
   remove block of blank lines


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-9795) Update Mesos documentation for flip6

2018-08-10 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9795.
--
   Resolution: Fixed
Fix Version/s: 1.7.0
   1.6.1

Fixed via
1.7.0: a442eb6c0388558c6fb2e5e616cd1cd15038b95c
1.6.1: 45cffa17252c5fb9bc38a6b771c2a75aaa8c10ee

> Update Mesos documentation for flip6
> 
>
> Key: FLINK-9795
> URL: https://issues.apache.org/jira/browse/FLINK-9795
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Leonid Ishimnikov
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0
>
>
> Mesos documentation would benefit from an overhaul after flip6 became the 
> default cluster management model.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6538: [hotfix][streaming] Fix and simplify PrintSinkFunctionTest

2018-08-10 Thread GitBox
pnowojski commented on issue #6538: [hotfix][streaming] Fix and simplify 
PrintSinkFunctionTest
URL: https://github.com/apache/flink/pull/6538#issuecomment-412118428
 
 
   Failure is still there. It seems like there is some travis failure, because 
many branches (including master and release-1.5) are failing with similar 
errors.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10105) Test failure because of jobmanager.execution.failover-strategy is outdated

2018-08-10 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10105:
--
Fix Version/s: 1.6.1

> Test failure because of jobmanager.execution.failover-strategy is outdated
> --
>
> Key: FLINK-10105
> URL: https://issues.apache.org/jira/browse/FLINK-10105
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: vinoyang
>Assignee: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.6.1, 1.7.0
>
>
> {code:java}
> Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.745 sec <<< 
> FAILURE! - in 
> org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase
> testFullReferenceCompleteness(org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase)
>   Time elapsed: 0.693 sec  <<< FAILURE!
> java.lang.AssertionError: Documentation is outdated, please regenerate it 
> according to the instructions in flink-docs/README.md.
>   Problems:
>   Documented description of 
> jobmanager.execution.failover-strategy in class 
> org.apache.flink.configuration.JobManagerOptions is outdated.
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.compareDocumentedAndExistingOptions(ConfigOptionsDocsCompletenessITCase.java:118)
>   at 
> org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.testFullReferenceCompleteness(ConfigOptionsDocsCompletenessITCase.java:76)
> Results :
> Failed tests: 
>   
> ConfigOptionsDocsCompletenessITCase.testFullReferenceCompleteness:76->compareDocumentedAndExistingOptions:118
>  Documentation is outdated, please regenerate it according to the 
> instructions in flink-docs/README.md.
>   Problems:
>   Documented description of 
> jobmanager.execution.failover-strategy in class 
> org.apache.flink.configuration.JobManagerOptions is outdated.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10105) Test failure because of jobmanager.execution.failover-strategy is outdated

2018-08-10 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10105.
---
Resolution: Fixed

1.6.1: fbcc4965b77488aca65ae833c09b85a7f3c79272

> Test failure because of jobmanager.execution.failover-strategy is outdated
> --
>
> Key: FLINK-10105
> URL: https://issues.apache.org/jira/browse/FLINK-10105
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: vinoyang
>Assignee: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.6.1, 1.7.0
>
>
> {code:java}
> Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.745 sec <<< 
> FAILURE! - in 
> org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase
> testFullReferenceCompleteness(org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase)
>   Time elapsed: 0.693 sec  <<< FAILURE!
> java.lang.AssertionError: Documentation is outdated, please regenerate it 
> according to the instructions in flink-docs/README.md.
>   Problems:
>   Documented description of 
> jobmanager.execution.failover-strategy in class 
> org.apache.flink.configuration.JobManagerOptions is outdated.
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.compareDocumentedAndExistingOptions(ConfigOptionsDocsCompletenessITCase.java:118)
>   at 
> org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.testFullReferenceCompleteness(ConfigOptionsDocsCompletenessITCase.java:76)
> Results :
> Failed tests: 
>   
> ConfigOptionsDocsCompletenessITCase.testFullReferenceCompleteness:76->compareDocumentedAndExistingOptions:118
>  Documentation is outdated, please regenerate it according to the 
> instructions in flink-docs/README.md.
>   Problems:
>   Documented description of 
> jobmanager.execution.failover-strategy in class 
> org.apache.flink.configuration.JobManagerOptions is outdated.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-10105) Test failure because of jobmanager.execution.failover-strategy is outdated

2018-08-10 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reopened FLINK-10105:
---

> Test failure because of jobmanager.execution.failover-strategy is outdated
> --
>
> Key: FLINK-10105
> URL: https://issues.apache.org/jira/browse/FLINK-10105
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: vinoyang
>Assignee: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.6.1, 1.7.0
>
>
> {code:java}
> Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.745 sec <<< 
> FAILURE! - in 
> org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase
> testFullReferenceCompleteness(org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase)
>   Time elapsed: 0.693 sec  <<< FAILURE!
> java.lang.AssertionError: Documentation is outdated, please regenerate it 
> according to the instructions in flink-docs/README.md.
>   Problems:
>   Documented description of 
> jobmanager.execution.failover-strategy in class 
> org.apache.flink.configuration.JobManagerOptions is outdated.
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.compareDocumentedAndExistingOptions(ConfigOptionsDocsCompletenessITCase.java:118)
>   at 
> org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.testFullReferenceCompleteness(ConfigOptionsDocsCompletenessITCase.java:76)
> Results :
> Failed tests: 
>   
> ConfigOptionsDocsCompletenessITCase.testFullReferenceCompleteness:76->compareDocumentedAndExistingOptions:118
>  Documentation is outdated, please regenerate it according to the 
> instructions in flink-docs/README.md.
>   Problems:
>   Documented description of 
> jobmanager.execution.failover-strategy in class 
> org.apache.flink.configuration.JobManagerOptions is outdated.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink.

2018-08-10 Thread GitBox
xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add 
Queryable table sink.
URL: https://github.com/apache/flink/pull/6434#discussion_r209297911
 
 

 ##
 File path: flink-libraries/flink-table/pom.xml
 ##
 @@ -186,6 +186,13 @@ under the License.
test
test-jar

+
+   
+   org.apache.flink
+   
flink-queryable-state-runtime_2.11
 
 Review comment:
   replace hardcoded 2.11 with ${scala.binary.version}


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576440#comment-16576440
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add 
Queryable table sink.
URL: https://github.com/apache/flink/pull/6434#discussion_r209297911
 
 

 ##
 File path: flink-libraries/flink-table/pom.xml
 ##
 @@ -186,6 +186,13 @@ under the License.
test
test-jar

+
+   
+   org.apache.flink
+   
flink-queryable-state-runtime_2.11
 
 Review comment:
   replace hardcoded 2.11 with ${scala.binary.version}


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5232) Add a Thread default uncaught exception handler on the JobManager

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576436#comment-16576436
 ] 

ASF GitHub Bot commented on FLINK-5232:
---

tillrohrmann commented on issue #6334: [FLINK-5232] Add a Thread default 
uncaught exception handler on the JobManager
URL: https://github.com/apache/flink/pull/6334#issuecomment-412116926
 
 
   Hi @yanghua, I think your approach goes in the right direction. Extending 
`ActorSystemImpl` allows us to set a custom uncaught exception handler. It is a 
bit tricky since we have to put our own `RobustActorSystem` in the same package 
as `ActorSystemImpl`. But I think this is ok since it only affects a single 
class. I've taken your work and extended it a bit in #6539 (second commit). 
What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a Thread default uncaught exception handler on the JobManager
> -
>
> Key: FLINK-5232
> URL: https://issues.apache.org/jira/browse/FLINK-5232
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> When some JobManager threads die because of uncaught exceptions, we should 
> bring down the JobManager. If a thread dies from an uncaught exception, there 
> is a high chance that the JobManager becomes dysfunctional.
> The only sfae thing is to rely on the JobManager being restarted by YARN / 
> Mesos / Kubernetes / etc.
> I suggest to add this code to the JobManager launch:
> {code}
> Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
> @Override
> public void uncaughtException(Thread t, Throwable e) {
> try {
> LOG.error("Thread {} died due to an uncaught exception. Killing 
> process.", t.getName());
> } finally {
> Runtime.getRuntime().halt(-1);
> }
> }
> });
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager

2018-08-10 Thread GitBox
tillrohrmann commented on issue #6334: [FLINK-5232] Add a Thread default 
uncaught exception handler on the JobManager
URL: https://github.com/apache/flink/pull/6334#issuecomment-412116926
 
 
   Hi @yanghua, I think your approach goes in the right direction. Extending 
`ActorSystemImpl` allows us to set a custom uncaught exception handler. It is a 
bit tricky since we have to put our own `RobustActorSystem` in the same package 
as `ActorSystemImpl`. But I think this is ok since it only affects a single 
class. I've taken your work and extended it a bit in #6539 (second commit). 
What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6538: [hotfix][streaming] Fix and simplify PrintSinkFunctionTest

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6538: [hotfix][streaming] Fix 
and simplify PrintSinkFunctionTest
URL: https://github.com/apache/flink/pull/6538#discussion_r209296461
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -20,106 +20,84 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 
 import org.junit.After;
-import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 
 import static org.junit.Assert.assertEquals;
 
 /**
- * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ * Tests for the {@link PrintSinkFunction}.
  */
 public class PrintSinkFunctionTest {
 
-   public PrintStream printStreamOriginal = System.out;
-   private String line = System.lineSeparator();
+   private final PrintStream originalSystemOut = System.out;
+   private final PrintStream originalSystemErr = System.err;
 
-   @Test
-   public void testPrintSinkStdOut() throws Exception {
-   ByteArrayOutputStream baos = new ByteArrayOutputStream();
-   PrintStream stream = new PrintStream(baos);
-   System.setOut(stream);
+   private final ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+   private final ByteArrayOutputStream arrayErrorStream = new 
ByteArrayOutputStream();
 
-   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
+   private final String line = System.lineSeparator();
 
-   PrintSinkFunction printSink = new PrintSinkFunction<>();
-   printSink.setRuntimeContext(ctx);
-   try {
-   printSink.open(new Configuration());
-   } catch (Exception e) {
-   Assert.fail();
+   @Before
+   public void setUp() {
+   System.setOut(new PrintStream(arrayOutputStream));
+   System.setErr(new PrintStream(arrayErrorStream));
+   }
+
+   @After
+   public void tearDown() {
+   if (System.out != originalSystemOut) {
+   System.out.close();
}
+   if (System.err != originalSystemErr) {
+   System.err.close();
+   }
+   System.setOut(originalSystemOut);
+   System.setErr(originalSystemErr);
+   }
+
+   @Test
+   public void testPrintSinkStdOut() throws Exception {
+   PrintSinkFunction printSink = new PrintSinkFunction<>();
+   printSink.setRuntimeContext(new 
MockStreamingRuntimeContext(false, 1, 0));
printSink.setTargetToStandardOut();
+   printSink.open(new Configuration());
+
printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
 
assertEquals("Print to System.out", printSink.toString());
-   assertEquals("hello world!" + line, baos.toString());
-
-   printSink.close();
 
 Review comment:
   Btw, is there a reason why `RichFunction` doesn't implement `AutoClosable`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10123) Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576433#comment-16576433
 ] 

ASF GitHub Bot commented on FLINK-10123:


tillrohrmann opened a new pull request #6539: [FLINK-10123] Use 
ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
URL: https://github.com/apache/flink/pull/6539
 
 
   ## What is the purpose of the change
   
   Using the ExecutorThreadFactory hardens the system because it uses the 
FatalExitExceptionHandler
   as UncaughtExceptionHandler which terminates the JVM in case of an exception.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
> --
>
> Key: FLINK-10123
> URL: https://issues.apache.org/jira/browse/FLINK-10123
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Instead of using the {{DefaultThreadFactory}} in the 
> {{RestServerEndpoint}}/{{RestClient}} we should use the 
> {{ExecutorThreadFactory}} because it uses the {{FatalExitExceptionHandler}} 
> per default as the uncaught exception handler. This should guard against 
> uncaught exceptions by simply terminating the JVM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann opened a new pull request #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client

2018-08-10 Thread GitBox
tillrohrmann opened a new pull request #6539: [FLINK-10123] Use 
ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
URL: https://github.com/apache/flink/pull/6539
 
 
   ## What is the purpose of the change
   
   Using the ExecutorThreadFactory hardens the system because it uses the 
FatalExitExceptionHandler
   as UncaughtExceptionHandler which terminates the JVM in case of an exception.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10123) Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client

2018-08-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10123:
---
Labels: pull-request-available  (was: )

> Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
> --
>
> Key: FLINK-10123
> URL: https://issues.apache.org/jira/browse/FLINK-10123
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Instead of using the {{DefaultThreadFactory}} in the 
> {{RestServerEndpoint}}/{{RestClient}} we should use the 
> {{ExecutorThreadFactory}} because it uses the {{FatalExitExceptionHandler}} 
> per default as the uncaught exception handler. This should guard against 
> uncaught exceptions by simply terminating the JVM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10123) Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client

2018-08-10 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10123:
-

 Summary: Use ExecutorThreadFactory instead of DefaultThreadFactory 
in RestServer/Client
 Key: FLINK-10123
 URL: https://issues.apache.org/jira/browse/FLINK-10123
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.6.0, 1.7.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.7.0


Instead of using the {{DefaultThreadFactory}} in the 
{{RestServerEndpoint}}/{{RestClient}} we should use the 
{{ExecutorThreadFactory}} because it uses the {{FatalExitExceptionHandler}} per 
default as the uncaught exception handler. This should guard against uncaught 
exceptions by simply terminating the JVM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576424#comment-16576424
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209284512
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -188,24 +192,32 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 
/**
 * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
-*
-* @return true if some data were written
 */
-   private boolean tryFinishCurrentBufferBuilder(int targetChannel, 
RecordSerializer serializer) {
-
-   if (!bufferBuilders[targetChannel].isPresent()) {
-   return false;
+   private void tryFinishCurrentBufferBuilder(int targetChannel) {
+   if (bufferBuilders[targetChannel].isPresent()) {
+   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   bufferBuilders[targetChannel] = Optional.empty();
+   numBytesOut.inc(bufferBuilder.finish());
}
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
-   bufferBuilders[targetChannel] = Optional.empty();
+   }
 
-   numBytesOut.inc(bufferBuilder.finish());
-   serializer.clear();
-   return true;
+   /**
+* The {@link BufferBuilder} may already exist if not filled up last 
time, otherwise we need
+* request a new one for this target channel.
+*/
+   @Nonnull
 
 Review comment:
   Imo you don't have to add `@Nonnull` annotation. I'm implicitly assuming 
that any non `@Nullable` marked field is automatically `@Nonnull`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576425#comment-16576425
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209290889
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ##
 @@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
 Review comment:
   I'm thinking about refactoring this class and splitting it into two:
   ```
   class RecordSerializer {
SerializedRecord serializeRecord(T record);
   };
   
   class SerializedRecord implements Autoclosable {
 CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder);
   
 void close() {
serializer.prune();
// and code to return state (serializationBuffer) to serializer for 
reuse
 }
   }
   ```
   
   and usage:
   ```
public void randomEmit(T record) throws IOException, 
InterruptedException {
try (SerializedRecord serializedRecord = 
serializer.serializeRecord(record)) {
copyToTarget(serializedRecord, 
rng.nextInt(numChannels));
}
}
   ```
   
   somehow always was/is tickling my brain in current `RecordSerializer` is 
confusing to me and I have to always check it's implementation whenever I 
revisit the code. Maybe with this split it would be easier to understand? But 
I'm not sure about this. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576422#comment-16576422
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209290136
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException {
}
}
 
+   /**
+* Broadcast channel selector that selects all the output channels.
+*/
+   private static class Broadcast implements 
ChannelSelector {
+
+   private int[] returnChannel;
+   boolean set;
 
 Review comment:
   1. do we need to cache `returnChannel`? Does it give any meaningful test 
execution speed up?
   2. if so, instead of using `set` and `setNumber`, just check whether 
`returnChannel.length == numberOfOutputChannels`. If not, create new one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing outputs, that 
> means the output will be serialized as many times as the number of selected 
> channels.
> As we know, data serialization is a high cost operation, so we can get good 
> benefits by improving the serialization only once.
> I would suggest the following changes for realizing it.
>  # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the 
> channels.
>  # The output is serialized into the intermediate data buffer only once for 
> different channels.
>  # The intermediate serialization results are copied into different 
> {{BufferBuilder}}s for different channels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576423#comment-16576423
 ] 

ASF GitHub Bot commented on FLINK-9913:
---

pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209289603
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
+   final RecordDeserializer deserializer = 
new SpillingAdaptiveSpanningRecordDeserializer<>(
+   new String[]{ tempFolder.getRoot().getAbsolutePath() });
+
+   final ArrayDeque serializedRecords = new 
ArrayDeque<>();
+   final Iterable records = 
Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
+   for (SerializationTestType record : records) {
+   serializedRecords.add(record);
+
+   if (isBroadcastEmit) {
+   writer.broadcastEmit(record);
+   } else {
+   writer.emit(record);
+   }
+   }
+
+   final int requiredBuffers = numValues / (bufferSize / (4 + 
serializationLength));
+   for (int i = 0; i < numChannels; i++) {
 
 Review comment:
   can you somehow extract common logic of this method and 
`org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest#testSerializationRoundTrip(java.lang.Iterable,
 int, 
org.apache.flink.runtime.io.network.api.serialization.RecordSerializer,
 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer)`?
 They share a lot of core.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve output serialization only once in RecordWriter
> --
>
> Key: FLINK-9913
> URL: https://issues.apache.org/jira/browse/FLINK-9913
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently the {{RecordWriter}} emits output into multi channels via 
> {{ChannelSelector}}  or broadcasts output to all channels directly. Each 
> channel has a separate {{RecordSerializer}} for serializing 

[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209290889
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 ##
 @@ -66,29 +66,33 @@ public boolean isFullBuffer() {
}
 
/**
-* Starts serializing and copying the given record to the target buffer
-* (if available).
+* Starts serializing the given record to an intermediate data buffer.
 *
 * @param record the record to serialize
-* @return how much information was written to the target buffer and
-* whether this buffer is full
 */
-   SerializationResult addRecord(T record) throws IOException;
+   void serializeRecord(T record) throws IOException;
 
 Review comment:
   I'm thinking about refactoring this class and splitting it into two:
   ```
   class RecordSerializer {
SerializedRecord serializeRecord(T record);
   };
   
   class SerializedRecord implements Autoclosable {
 CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder);
   
 void close() {
serializer.prune();
// and code to return state (serializationBuffer) to serializer for 
reuse
 }
   }
   ```
   
   and usage:
   ```
public void randomEmit(T record) throws IOException, 
InterruptedException {
try (SerializedRecord serializedRecord = 
serializer.serializeRecord(record)) {
copyToTarget(serializedRecord, 
rng.nextInt(numChannels));
}
}
   ```
   
   somehow always was/is tickling my brain in current `RecordSerializer` is 
confusing to me and I have to always check it's implementation whenever I 
revisit the code. Maybe with this split it would be easier to understand? But 
I'm not sure about this. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209290136
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException {
}
}
 
+   /**
+* Broadcast channel selector that selects all the output channels.
+*/
+   private static class Broadcast implements 
ChannelSelector {
+
+   private int[] returnChannel;
+   boolean set;
 
 Review comment:
   1. do we need to cache `returnChannel`? Does it give any meaningful test 
execution speed up?
   2. if so, instead of using `set` and `setNumber`, just check whether 
`returnChannel.length == numberOfOutputChannels`. If not, create new one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209284512
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ##
 @@ -188,24 +192,32 @@ public void setMetricGroup(TaskIOMetricGroup metrics) {
 
/**
 * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
-*
-* @return true if some data were written
 */
-   private boolean tryFinishCurrentBufferBuilder(int targetChannel, 
RecordSerializer serializer) {
-
-   if (!bufferBuilders[targetChannel].isPresent()) {
-   return false;
+   private void tryFinishCurrentBufferBuilder(int targetChannel) {
+   if (bufferBuilders[targetChannel].isPresent()) {
+   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
+   bufferBuilders[targetChannel] = Optional.empty();
+   numBytesOut.inc(bufferBuilder.finish());
}
-   BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
-   bufferBuilders[targetChannel] = Optional.empty();
+   }
 
-   numBytesOut.inc(bufferBuilder.finish());
-   serializer.clear();
-   return true;
+   /**
+* The {@link BufferBuilder} may already exist if not filled up last 
time, otherwise we need
+* request a new one for this target channel.
+*/
+   @Nonnull
 
 Review comment:
   Imo you don't have to add `@Nonnull` annotation. I'm implicitly assuming 
that any non `@Nullable` marked field is automatically `@Nonnull`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] 
Improve output serialization only once in RecordWriter
URL: https://github.com/apache/flink/pull/6417#discussion_r209289603
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##
 @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws 
Exception {
assertEquals("Buffer 2 shares the same reader index as buffer 
1", 0, buffer2.getReaderIndex());
}
 
+   /**
+* Tests that records are broadcast via {@link ChannelSelector} and
+* {@link RecordWriter#emit(IOReadableWritable)}.
+*/
+   @Test
+   public void testEmitRecordWithBroadcastPartitioner() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false);
+   }
+
+   /**
+* Tests that records are broadcast via {@link 
RecordWriter#broadcastEmit(IOReadableWritable)}.
+*/
+   @Test
+   public void testBroadcastEmitRecord() throws Exception {
+   emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true);
+   }
+
+   /**
+* The results of emitting records via BroadcastPartitioner or 
broadcasting records directly are the same,
+* that is all the target channels can receive the whole outputs.
+*
+* @param isBroadcastEmit whether using {@link 
RecordWriter#broadcastEmit(IOReadableWritable)} or not
+*/
+   private void 
emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean 
isBroadcastEmit) throws Exception {
+   final int numChannels = 4;
+   final int bufferSize = 32;
+   final int numValues = 8;
+   final int serializationLength = 4;
+
+   @SuppressWarnings("unchecked")
+   final Queue[] queues = new Queue[numChannels];
+   for (int i = 0; i < numChannels; i++) {
+   queues[i] = new ArrayDeque<>();
+   }
+
+   final TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
+   final ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
+   final RecordWriter writer = new 
RecordWriter<>(partitionWriter, new Broadcast<>());
+   final RecordDeserializer deserializer = 
new SpillingAdaptiveSpanningRecordDeserializer<>(
+   new String[]{ tempFolder.getRoot().getAbsolutePath() });
+
+   final ArrayDeque serializedRecords = new 
ArrayDeque<>();
+   final Iterable records = 
Util.randomRecords(numValues, SerializationTestTypeFactory.INT);
+   for (SerializationTestType record : records) {
+   serializedRecords.add(record);
+
+   if (isBroadcastEmit) {
+   writer.broadcastEmit(record);
+   } else {
+   writer.emit(record);
+   }
+   }
+
+   final int requiredBuffers = numValues / (bufferSize / (4 + 
serializationLength));
+   for (int i = 0; i < numChannels; i++) {
 
 Review comment:
   can you somehow extract common logic of this method and 
`org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest#testSerializationRoundTrip(java.lang.Iterable,
 int, 
org.apache.flink.runtime.io.network.api.serialization.RecordSerializer,
 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer)`?
 They share a lot of core.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576399#comment-16576399
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209288519
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
 ##
 @@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * Iterator that merges multiple RocksDB iterators to partition all states 
into contiguous key-groups.
+ * The resulting iteration sequence is ordered by (key-group, kv-state).
+ */
+public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable {
+
+   private final PriorityQueue heap;
+   private final int keyGroupPrefixByteCount;
+   private boolean newKeyGroup;
+   private boolean newKVState;
+   private boolean valid;
+   private RocksSingleStateIterator currentSubIterator;
+
+   private static final List> 
COMPARATORS;
+
+   static {
+   int maxBytes = 2;
+   COMPARATORS = new ArrayList<>(maxBytes);
+   for (int i = 0; i < maxBytes; ++i) {
+   final int currentBytes = i + 1;
+   COMPARATORS.add((o1, o2) -> {
+   int arrayCmpRes = compareKeyGroupsForByteArrays(
+   o1.getCurrentKey(), o2.getCurrentKey(), 
currentBytes);
+   return arrayCmpRes == 0 ? o1.getKvStateId() - 
o2.getKvStateId() : arrayCmpRes;
+   });
+   }
+   }
+
+   public RocksStatesPerKeyGroupMergeIterator(
+   List> kvStateIterators,
+   final int keyGroupPrefixByteCount) {
+   Preconditions.checkNotNull(kvStateIterators);
+   Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
+   this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+   Comparator iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount - 1);
+
+   if (kvStateIterators.size() > 0) {
+   PriorityQueue 
iteratorPriorityQueue =
+   new PriorityQueue<>(kvStateIterators.size(), 
iteratorComparator);
+
+   for (Tuple2 
rocksIteratorWithKVStateId : kvStateIterators) {
+   final RocksIteratorWrapper rocksIterator = 
rocksIteratorWithKVStateId.f0;
+   rocksIterator.seekToFirst();
+   if (rocksIterator.isValid()) {
+   iteratorPriorityQueue.offer(new 
RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+   } else {
+   IOUtils.closeQuietly(rocksIterator);
+   }
+   }
+
+   kvStateIterators.clear();
+
+   this.heap = iteratorPriorityQueue;
+   this.valid = !heap.isEmpty();
+   this.currentSubIterator = heap.poll();
+   } else {
+   // creating a PriorityQueue of size 0 results in an 
exception.
+   this.heap = null;
+   this.valid = false;
+   }
+
+   this.newKeyGroup = true;
+   this.newKVState = true;
+   }
+
+   /**
+* Advance the iterator. Should only be called if {@link #isValid()} 

[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576402#comment-16576402
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209283620
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link 
Iterator} to iterate over the keys. This class
+ * is not thread safe.
+ *
+ * @param  the type of the iterated objects, which are keys in RocksDB.
+ */
+public class RocksStateKeysIterator implements Iterator, AutoCloseable {
+
+   @Nonnull
+   private final RocksIteratorWrapper iterator;
+
+   @Nonnull
+   private final String state;
+
+   @Nonnull
+   private final TypeSerializer keySerializer;
+
+   @Nonnull
+   private final byte[] namespaceBytes;
+
+   private final boolean ambiguousKeyPossible;
+   private final int keyGroupPrefixBytes;
+   private K nextKey;
+   private K previousKey;
+
+   public RocksStateKeysIterator(
+   @Nonnull RocksIteratorWrapper iterator,
+   @Nonnull String state,
+   @Nonnull TypeSerializer keySerializer,
+   int keyGroupPrefixBytes,
+   boolean ambiguousKeyPossible,
+   @Nonnull byte[] namespaceBytes) {
+   this.iterator = iterator;
+   this.state = state;
+   this.keySerializer = keySerializer;
+   this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+   this.namespaceBytes = namespaceBytes;
+   this.nextKey = null;
+   this.previousKey = null;
+   this.ambiguousKeyPossible = ambiguousKeyPossible;
+   }
+
+   @Override
+   public boolean hasNext() {
+   try {
+   while (nextKey == null && iterator.isValid()) {
+
+   byte[] key = iterator.key();
+
+   ByteArrayInputStreamWithPos inputStream =
+   new ByteArrayInputStreamWithPos(key, 
keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
+
+   DataInputViewStreamWrapper dataInput = new 
DataInputViewStreamWrapper(inputStream);
+
+   K value = RocksDBKeySerializationUtils.readKey(
 
 Review comment:
   value -> currentKey


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>





[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576400#comment-16576400
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209280247
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wraps a RocksDB iterator to cache it's current key and assigns an id for 
the key/value state to the iterator.
+ * Used by {@link RocksStatesPerKeyGroupMergeIterator}.
+ */
+class RocksSingleStateIterator implements AutoCloseable {
+
+   /**
+* @param iterator  The #RocksIterator to wrap .
 
 Review comment:
   Maybe:
   `@param iterator underlying {@link RocksIteratorWrapper}`
   otherwise it is like wrapper to wrap wrapper :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576401#comment-16576401
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209290925
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+
+import org.rocksdb.RocksIterator;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wrapper around {@link RocksIteratorWrapper} that applies a given {@link 
StateSnapshotTransformer} to the elements
 
 Review comment:
   Wrapper around {@link *RocksIterator*}


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576403#comment-16576403
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209287458
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
 ##
 @@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * Iterator that merges multiple RocksDB iterators to partition all states 
into contiguous key-groups.
+ * The resulting iteration sequence is ordered by (key-group, kv-state).
+ */
+public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable {
+
+   private final PriorityQueue heap;
+   private final int keyGroupPrefixByteCount;
+   private boolean newKeyGroup;
+   private boolean newKVState;
+   private boolean valid;
+   private RocksSingleStateIterator currentSubIterator;
+
+   private static final List> 
COMPARATORS;
+
+   static {
+   int maxBytes = 2;
+   COMPARATORS = new ArrayList<>(maxBytes);
+   for (int i = 0; i < maxBytes; ++i) {
+   final int currentBytes = i + 1;
+   COMPARATORS.add((o1, o2) -> {
+   int arrayCmpRes = compareKeyGroupsForByteArrays(
+   o1.getCurrentKey(), o2.getCurrentKey(), 
currentBytes);
+   return arrayCmpRes == 0 ? o1.getKvStateId() - 
o2.getKvStateId() : arrayCmpRes;
+   });
+   }
+   }
+
+   public RocksStatesPerKeyGroupMergeIterator(
+   List> kvStateIterators,
+   final int keyGroupPrefixByteCount) {
+   Preconditions.checkNotNull(kvStateIterators);
+   Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
+   this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+   Comparator iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount - 1);
+
+   if (kvStateIterators.size() > 0) {
+   PriorityQueue 
iteratorPriorityQueue =
 
 Review comment:
   could be also separate method like:
   `PriorityQueue fillQueue(... kvStateIterators)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576398#comment-16576398
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209283561
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link 
Iterator} to iterate over the keys. This class
+ * is not thread safe.
+ *
+ * @param  the type of the iterated objects, which are keys in RocksDB.
+ */
+public class RocksStateKeysIterator implements Iterator, AutoCloseable {
+
+   @Nonnull
+   private final RocksIteratorWrapper iterator;
+
+   @Nonnull
+   private final String state;
+
+   @Nonnull
+   private final TypeSerializer keySerializer;
+
+   @Nonnull
+   private final byte[] namespaceBytes;
+
+   private final boolean ambiguousKeyPossible;
+   private final int keyGroupPrefixBytes;
+   private K nextKey;
+   private K previousKey;
+
+   public RocksStateKeysIterator(
+   @Nonnull RocksIteratorWrapper iterator,
+   @Nonnull String state,
+   @Nonnull TypeSerializer keySerializer,
+   int keyGroupPrefixBytes,
+   boolean ambiguousKeyPossible,
+   @Nonnull byte[] namespaceBytes) {
+   this.iterator = iterator;
+   this.state = state;
+   this.keySerializer = keySerializer;
+   this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+   this.namespaceBytes = namespaceBytes;
+   this.nextKey = null;
+   this.previousKey = null;
+   this.ambiguousKeyPossible = ambiguousKeyPossible;
+   }
+
+   @Override
+   public boolean hasNext() {
+   try {
+   while (nextKey == null && iterator.isValid()) {
+
+   byte[] key = iterator.key();
+
+   ByteArrayInputStreamWithPos inputStream =
 
 Review comment:
   I would suggest to move deserialisation stuff into smaller method, like:
   `Tuple2 deserKeyAndNamespacePos(byte[] keyBytes)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10122) KafkaConsumer should use partitionable state over union state if partition discovery is not active

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576396#comment-16576396
 ] 

ASF GitHub Bot commented on FLINK-10122:


StefanRRichter commented on a change in pull request #6537: [FLINK-10122] 
KafkaConsumer should use partitionable state over union state if partition 
discovery is not active
URL: https://github.com/apache/flink/pull/6537#discussion_r209291672
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ##
 @@ -196,6 +196,20 @@ public void removeOperatorState(String name) {
}
}
 
+   public void deleteBroadCastState(String name) {
 
 Review comment:
   Argh...will remove the second one ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer should use partitionable state over union state if partition 
> discovery is not active
> --
>
> Key: FLINK-10122
> URL: https://issues.apache.org/jira/browse/FLINK-10122
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> KafkaConsumer store its offsets state always as union state. I think this is 
> only required in the case that partition discovery is active. For jobs with a 
> very high parallelism, the union state can lead to prohibitively expensive 
> deployments. For example, a job with 2000 source and a total of 10MB 
> checkpointed union state offsets state would have to ship ~ 2000 x 10MB = 
> 20GB of state. With partitionable state, it would have to ship ~10MB.
> For now, I would suggest to go back to partitionable state in case that 
> partition discovery is not active. In the long run, I have some ideas for 
> more efficient partitioning schemes that would also work for active discovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …

2018-08-10 Thread GitBox
azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209290925
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+
+import org.rocksdb.RocksIterator;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wrapper around {@link RocksIteratorWrapper} that applies a given {@link 
StateSnapshotTransformer} to the elements
 
 Review comment:
   Wrapper around {@link *RocksIterator*}


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …

2018-08-10 Thread GitBox
azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209288519
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
 ##
 @@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * Iterator that merges multiple RocksDB iterators to partition all states 
into contiguous key-groups.
+ * The resulting iteration sequence is ordered by (key-group, kv-state).
+ */
+public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable {
+
+   private final PriorityQueue heap;
+   private final int keyGroupPrefixByteCount;
+   private boolean newKeyGroup;
+   private boolean newKVState;
+   private boolean valid;
+   private RocksSingleStateIterator currentSubIterator;
+
+   private static final List> 
COMPARATORS;
+
+   static {
+   int maxBytes = 2;
+   COMPARATORS = new ArrayList<>(maxBytes);
+   for (int i = 0; i < maxBytes; ++i) {
+   final int currentBytes = i + 1;
+   COMPARATORS.add((o1, o2) -> {
+   int arrayCmpRes = compareKeyGroupsForByteArrays(
+   o1.getCurrentKey(), o2.getCurrentKey(), 
currentBytes);
+   return arrayCmpRes == 0 ? o1.getKvStateId() - 
o2.getKvStateId() : arrayCmpRes;
+   });
+   }
+   }
+
+   public RocksStatesPerKeyGroupMergeIterator(
+   List> kvStateIterators,
+   final int keyGroupPrefixByteCount) {
+   Preconditions.checkNotNull(kvStateIterators);
+   Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
+   this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+   Comparator iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount - 1);
+
+   if (kvStateIterators.size() > 0) {
+   PriorityQueue 
iteratorPriorityQueue =
+   new PriorityQueue<>(kvStateIterators.size(), 
iteratorComparator);
+
+   for (Tuple2 
rocksIteratorWithKVStateId : kvStateIterators) {
+   final RocksIteratorWrapper rocksIterator = 
rocksIteratorWithKVStateId.f0;
+   rocksIterator.seekToFirst();
+   if (rocksIterator.isValid()) {
+   iteratorPriorityQueue.offer(new 
RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+   } else {
+   IOUtils.closeQuietly(rocksIterator);
+   }
+   }
+
+   kvStateIterators.clear();
+
+   this.heap = iteratorPriorityQueue;
+   this.valid = !heap.isEmpty();
+   this.currentSubIterator = heap.poll();
+   } else {
+   // creating a PriorityQueue of size 0 results in an 
exception.
+   this.heap = null;
+   this.valid = false;
+   }
+
+   this.newKeyGroup = true;
+   this.newKVState = true;
+   }
+
+   /**
+* Advance the iterator. Should only be called if {@link #isValid()} 
returned true. Valid can only chance after
+* calls to {@link #next()}.
 
 Review comment:
   Typos:
   Valid flag can only *change* after calling {@link #next()}


This 

[GitHub] azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …

2018-08-10 Thread GitBox
azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209283561
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link 
Iterator} to iterate over the keys. This class
+ * is not thread safe.
+ *
+ * @param  the type of the iterated objects, which are keys in RocksDB.
+ */
+public class RocksStateKeysIterator implements Iterator, AutoCloseable {
+
+   @Nonnull
+   private final RocksIteratorWrapper iterator;
+
+   @Nonnull
+   private final String state;
+
+   @Nonnull
+   private final TypeSerializer keySerializer;
+
+   @Nonnull
+   private final byte[] namespaceBytes;
+
+   private final boolean ambiguousKeyPossible;
+   private final int keyGroupPrefixBytes;
+   private K nextKey;
+   private K previousKey;
+
+   public RocksStateKeysIterator(
+   @Nonnull RocksIteratorWrapper iterator,
+   @Nonnull String state,
+   @Nonnull TypeSerializer keySerializer,
+   int keyGroupPrefixBytes,
+   boolean ambiguousKeyPossible,
+   @Nonnull byte[] namespaceBytes) {
+   this.iterator = iterator;
+   this.state = state;
+   this.keySerializer = keySerializer;
+   this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+   this.namespaceBytes = namespaceBytes;
+   this.nextKey = null;
+   this.previousKey = null;
+   this.ambiguousKeyPossible = ambiguousKeyPossible;
+   }
+
+   @Override
+   public boolean hasNext() {
+   try {
+   while (nextKey == null && iterator.isValid()) {
+
+   byte[] key = iterator.key();
+
+   ByteArrayInputStreamWithPos inputStream =
 
 Review comment:
   I would suggest to move deserialisation stuff into smaller method, like:
   `Tuple2 deserKeyAndNamespacePos(byte[] keyBytes)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …

2018-08-10 Thread GitBox
azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209280247
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wraps a RocksDB iterator to cache it's current key and assigns an id for 
the key/value state to the iterator.
+ * Used by {@link RocksStatesPerKeyGroupMergeIterator}.
+ */
+class RocksSingleStateIterator implements AutoCloseable {
+
+   /**
+* @param iterator  The #RocksIterator to wrap .
 
 Review comment:
   Maybe:
   `@param iterator underlying {@link RocksIteratorWrapper}`
   otherwise it is like wrapper to wrap wrapper :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …

2018-08-10 Thread GitBox
azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209283620
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link 
Iterator} to iterate over the keys. This class
+ * is not thread safe.
+ *
+ * @param  the type of the iterated objects, which are keys in RocksDB.
+ */
+public class RocksStateKeysIterator implements Iterator, AutoCloseable {
+
+   @Nonnull
+   private final RocksIteratorWrapper iterator;
+
+   @Nonnull
+   private final String state;
+
+   @Nonnull
+   private final TypeSerializer keySerializer;
+
+   @Nonnull
+   private final byte[] namespaceBytes;
+
+   private final boolean ambiguousKeyPossible;
+   private final int keyGroupPrefixBytes;
+   private K nextKey;
+   private K previousKey;
+
+   public RocksStateKeysIterator(
+   @Nonnull RocksIteratorWrapper iterator,
+   @Nonnull String state,
+   @Nonnull TypeSerializer keySerializer,
+   int keyGroupPrefixBytes,
+   boolean ambiguousKeyPossible,
+   @Nonnull byte[] namespaceBytes) {
+   this.iterator = iterator;
+   this.state = state;
+   this.keySerializer = keySerializer;
+   this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+   this.namespaceBytes = namespaceBytes;
+   this.nextKey = null;
+   this.previousKey = null;
+   this.ambiguousKeyPossible = ambiguousKeyPossible;
+   }
+
+   @Override
+   public boolean hasNext() {
+   try {
+   while (nextKey == null && iterator.isValid()) {
+
+   byte[] key = iterator.key();
+
+   ByteArrayInputStreamWithPos inputStream =
+   new ByteArrayInputStreamWithPos(key, 
keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
+
+   DataInputViewStreamWrapper dataInput = new 
DataInputViewStreamWrapper(inputStream);
+
+   K value = RocksDBKeySerializationUtils.readKey(
 
 Review comment:
   value -> currentKey


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …

2018-08-10 Thread GitBox
azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209287458
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
 ##
 @@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * Iterator that merges multiple RocksDB iterators to partition all states 
into contiguous key-groups.
+ * The resulting iteration sequence is ordered by (key-group, kv-state).
+ */
+public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable {
+
+   private final PriorityQueue heap;
+   private final int keyGroupPrefixByteCount;
+   private boolean newKeyGroup;
+   private boolean newKVState;
+   private boolean valid;
+   private RocksSingleStateIterator currentSubIterator;
+
+   private static final List> 
COMPARATORS;
+
+   static {
+   int maxBytes = 2;
+   COMPARATORS = new ArrayList<>(maxBytes);
+   for (int i = 0; i < maxBytes; ++i) {
+   final int currentBytes = i + 1;
+   COMPARATORS.add((o1, o2) -> {
+   int arrayCmpRes = compareKeyGroupsForByteArrays(
+   o1.getCurrentKey(), o2.getCurrentKey(), 
currentBytes);
+   return arrayCmpRes == 0 ? o1.getKvStateId() - 
o2.getKvStateId() : arrayCmpRes;
+   });
+   }
+   }
+
+   public RocksStatesPerKeyGroupMergeIterator(
+   List> kvStateIterators,
+   final int keyGroupPrefixByteCount) {
+   Preconditions.checkNotNull(kvStateIterators);
+   Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
+   this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+   Comparator iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount - 1);
+
+   if (kvStateIterators.size() > 0) {
+   PriorityQueue 
iteratorPriorityQueue =
 
 Review comment:
   could be also separate method like:
   `PriorityQueue fillQueue(... kvStateIterators)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6537: [FLINK-10122] KafkaConsumer should use partitionable state over union state if partition discovery is not active

2018-08-10 Thread GitBox
StefanRRichter commented on a change in pull request #6537: [FLINK-10122] 
KafkaConsumer should use partitionable state over union state if partition 
discovery is not active
URL: https://github.com/apache/flink/pull/6537#discussion_r209291672
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ##
 @@ -196,6 +196,20 @@ public void removeOperatorState(String name) {
}
}
 
+   public void deleteBroadCastState(String name) {
 
 Review comment:
   Argh...will remove the second one ;)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10122) KafkaConsumer should use partitionable state over union state if partition discovery is not active

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576394#comment-16576394
 ] 

ASF GitHub Bot commented on FLINK-10122:


aljoscha commented on a change in pull request #6537: [FLINK-10122] 
KafkaConsumer should use partitionable state over union state if partition 
discovery is not active
URL: https://github.com/apache/flink/pull/6537#discussion_r209291065
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ##
 @@ -196,6 +196,20 @@ public void removeOperatorState(String name) {
}
}
 
+   public void deleteBroadCastState(String name) {
 
 Review comment:
   I think you added these methods twice, once under `deleteBroadcastState` and 
once under `deleteBroadCastState`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer should use partitionable state over union state if partition 
> discovery is not active
> --
>
> Key: FLINK-10122
> URL: https://issues.apache.org/jira/browse/FLINK-10122
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> KafkaConsumer store its offsets state always as union state. I think this is 
> only required in the case that partition discovery is active. For jobs with a 
> very high parallelism, the union state can lead to prohibitively expensive 
> deployments. For example, a job with 2000 source and a total of 10MB 
> checkpointed union state offsets state would have to ship ~ 2000 x 10MB = 
> 20GB of state. With partitionable state, it would have to ship ~10MB.
> For now, I would suggest to go back to partitionable state in case that 
> partition discovery is not active. In the long run, I have some ideas for 
> more efficient partitioning schemes that would also work for active discovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aljoscha commented on a change in pull request #6537: [FLINK-10122] KafkaConsumer should use partitionable state over union state if partition discovery is not active

2018-08-10 Thread GitBox
aljoscha commented on a change in pull request #6537: [FLINK-10122] 
KafkaConsumer should use partitionable state over union state if partition 
discovery is not active
URL: https://github.com/apache/flink/pull/6537#discussion_r209291065
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ##
 @@ -196,6 +196,20 @@ public void removeOperatorState(String name) {
}
}
 
+   public void deleteBroadCastState(String name) {
 
 Review comment:
   I think you added these methods twice, once under `deleteBroadcastState` and 
once under `deleteBroadCastState`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aljoscha commented on a change in pull request #6538: [hotfix][streaming] Fix and simplify PrintSinkFunctionTest

2018-08-10 Thread GitBox
aljoscha commented on a change in pull request #6538: [hotfix][streaming] Fix 
and simplify PrintSinkFunctionTest
URL: https://github.com/apache/flink/pull/6538#discussion_r209283937
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -20,106 +20,84 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 
 import org.junit.After;
-import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 
 import static org.junit.Assert.assertEquals;
 
 /**
- * Tests for the {@link 
org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}.
+ * Tests for the {@link PrintSinkFunction}.
  */
 public class PrintSinkFunctionTest {
 
-   public PrintStream printStreamOriginal = System.out;
-   private String line = System.lineSeparator();
+   private final PrintStream originalSystemOut = System.out;
+   private final PrintStream originalSystemErr = System.err;
 
-   @Test
-   public void testPrintSinkStdOut() throws Exception {
-   ByteArrayOutputStream baos = new ByteArrayOutputStream();
-   PrintStream stream = new PrintStream(baos);
-   System.setOut(stream);
+   private final ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+   private final ByteArrayOutputStream arrayErrorStream = new 
ByteArrayOutputStream();
 
-   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
+   private final String line = System.lineSeparator();
 
-   PrintSinkFunction printSink = new PrintSinkFunction<>();
-   printSink.setRuntimeContext(ctx);
-   try {
-   printSink.open(new Configuration());
-   } catch (Exception e) {
-   Assert.fail();
+   @Before
+   public void setUp() {
+   System.setOut(new PrintStream(arrayOutputStream));
+   System.setErr(new PrintStream(arrayErrorStream));
+   }
+
+   @After
+   public void tearDown() {
+   if (System.out != originalSystemOut) {
+   System.out.close();
}
+   if (System.err != originalSystemErr) {
+   System.err.close();
+   }
+   System.setOut(originalSystemOut);
+   System.setErr(originalSystemErr);
+   }
+
+   @Test
+   public void testPrintSinkStdOut() throws Exception {
+   PrintSinkFunction printSink = new PrintSinkFunction<>();
+   printSink.setRuntimeContext(new 
MockStreamingRuntimeContext(false, 1, 0));
printSink.setTargetToStandardOut();
+   printSink.open(new Configuration());
+
printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
 
assertEquals("Print to System.out", printSink.toString());
-   assertEquals("hello world!" + line, baos.toString());
-
-   printSink.close();
 
 Review comment:
   we could still close the sink function


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10006) Improve logging in BarrierBuffer

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576366#comment-16576366
 ] 

ASF GitHub Bot commented on FLINK-10006:


dawidwys commented on issue #6470: [FLINK-10006][network] improve logging in 
BarrierBuffer: prepend owning task name
URL: https://github.com/apache/flink/pull/6470#issuecomment-412103541
 
 
   +1, lgtm


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve logging in BarrierBuffer
> 
>
> Key: FLINK-10006
> URL: https://issues.apache.org/jira/browse/FLINK-10006
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.2, 1.6.0, 1.7.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Almost all log messages of {{BarrierBuffer}} do not contain the task name and 
> are therefore of little use if either multiple slots are executed on a single 
> TM or multiple checkpoints run in parallel.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys commented on issue #6470: [FLINK-10006][network] improve logging in BarrierBuffer: prepend owning task name

2018-08-10 Thread GitBox
dawidwys commented on issue #6470: [FLINK-10006][network] improve logging in 
BarrierBuffer: prepend owning task name
URL: https://github.com/apache/flink/pull/6470#issuecomment-412103541
 
 
   +1, lgtm


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10109) Add documentation for StreamingFileSink

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576352#comment-16576352
 ] 

ASF GitHub Bot commented on FLINK-10109:


aljoscha commented on issue #6532: [FLINK-10109] Add documentation for 
StreamingFileSink
URL: https://github.com/apache/flink/pull/6532#issuecomment-412101222
 
 
   I think I addressed all comments, PTAL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add documentation for StreamingFileSink
> ---
>
> Key: FLINK-10109
> URL: https://issues.apache.org/jira/browse/FLINK-10109
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aljoscha commented on issue #6532: [FLINK-10109] Add documentation for StreamingFileSink

2018-08-10 Thread GitBox
aljoscha commented on issue #6532: [FLINK-10109] Add documentation for 
StreamingFileSink
URL: https://github.com/apache/flink/pull/6532#issuecomment-412101222
 
 
   I think I addressed all comments, PTAL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #6527: [hotfix] [docs] Fix ProcessWindowFunction code snippets.

2018-08-10 Thread GitBox
asfgit closed pull request #6527: [hotfix] [docs] Fix ProcessWindowFunction 
code snippets.
URL: https://github.com/apache/flink/pull/6527
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/stream/operators/windows.md 
b/docs/dev/stream/operators/windows.md
index e5ec0091e07..bebc5dd260e 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -724,17 +724,19 @@ A `ProcessWindowFunction` can be defined and used like 
this:
 DataStream> input = ...;
 
 input
-.keyBy()
-.window()
-.process(new MyProcessWindowFunction());
+  .keyBy(t -> t.f0)
+  .timeWindow(Time.minutes(5))
+  .process(new MyProcessWindowFunction());
 
 /* ... */
 
-public class MyProcessWindowFunction extends 
ProcessWindowFunction, String, String, TimeWindow> {
+public class MyProcessWindowFunction 
+extends ProcessWindowFunction, String, String, 
TimeWindow> {
 
-  void process(String key, Context context, Iterable> 
input, Collector out) {
+  @Override
+  public void process(String key, Context context, Iterable> input, Collector out) {
 long count = 0;
-for (Tuple in: input) {
+for (Tuple2 in: input) {
   count++;
 }
 out.collect("Window: " + context.window() + "count: " + count);
@@ -749,9 +751,9 @@ public class MyProcessWindowFunction extends 
ProcessWindowFunction)
-.window()
-.process(new MyProcessWindowFunction())
+  .keyBy(_._1)
+  .timeWindow(Time.minutes(5))
+  .process(new MyProcessWindowFunction())
 
 /* ... */
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10047) Checkpointing takes much longer with credit base flow control.

2018-08-10 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576320#comment-16576320
 ] 

Piotr Nowojski commented on FLINK-10047:


[~zjwang] at the moment it seems that this is not a problem of credit base flow 
control. Still investigating this. I will keep you posted :)

> Checkpointing takes much longer with credit base flow control.
> --
>
> Key: FLINK-10047
> URL: https://issues.apache.org/jira/browse/FLINK-10047
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2
>Reporter: Piotr Nowojski
>Priority: Critical
>
> As reported by an user, in some scenario it looks like checkpointing takes 
> significantly more time (~40 minutes compared to ~2 minutes in Flink 1.4). 
> Probably throughput is also lower.
> We are waiting for more logs with detailed Flink's metrics to get more 
> insight.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on issue #6527: [hotfix] [docs] Fix ProcessWindowFunction code snippets.

2018-08-10 Thread GitBox
fhueske commented on issue #6527: [hotfix] [docs] Fix ProcessWindowFunction 
code snippets.
URL: https://github.com/apache/flink/pull/6527#issuecomment-412090788
 
 
   Thanks @dawidwys 
   Merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576310#comment-16576310
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209260792
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception {
stream.close();
}
 
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   PrintStream stream = new PrintStream(baos);
+   System.setOut(stream);
+
+   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
+   Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2);
+   Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1);
+
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>(false, "mySink");
+   printSink.setRuntimeContext(ctx);
+   try {
+   printSink.open(new Configuration());
+   } catch (Exception e) {
+   Assert.fail();
+   }
+   printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, baos.toString());
+
+   printSink.setTargetToStandardErr();
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, baos.toString());
+
+   printSink.close();
+   stream.close();
+   }
+
+   @Test
+   public void testPrintSinkWithIdentifierButNoPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
 Review comment:
   There is quite a lot of code duplication in those tests and they were 
unnecessarily using mockito instead of proper mock. Also there was even a bug 
in `testPrintSinkStdErr`. I have fixed those issues in a hotifx: 
https://github.com/apache/flink/pull/6538
   
   I would the hotifx to be merged before this PR and please adapt/rewrite your 
test in similar fashion as I did in my hotfix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576314#comment-16576314
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209234301
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -55,6 +57,17 @@ public PrintSinkFunction(boolean stdErr) {
target = stdErr;
}
 
+   /**
+* Instantiates a print sink function that prints to standard out and 
gives a sink identifier.
+*
+* @param stdErr True, if the format should print to standard error 
instead of standard out.
+* @param sinkIdentifier Message that identify sink and is prefixed to 
the output of the value
+*/
+   public PrintSinkFunction(boolean stdErr, String sinkIdentifier) {
+   this(stdErr);
+   this.sinkIdentifier = sinkIdentifier;
 
 Review comment:
   Usually less detailed constructor is calling the more specific ones, not the 
other way around. Here it will allow you to mark fields as `final`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576311#comment-16576311
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209234901
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws 
Exception {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
 
+   /**
 
 Review comment:
   please move this to class's java doc. Also replace `sinkId`, `sink id` with 
`{@code sinkIdentifier} `.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576308#comment-16576308
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209237840
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws 
Exception {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
 
+   /**
+* Four possible format options:
+*  sinkId:taskId> output  <- sink id provided, parallelism 
> 1
+*  sinkId> output <- sink id provided, parallelism 
== 1
+*  taskId> output <- no sink id provided, 
parallelism > 1
+*  output <- no sink id provided, 
parallelism == 1
+*/
+
// set the prefix if we have a >1 parallelism
prefix = (context.getNumberOfParallelSubtasks() > 1) ?
((context.getIndexOfThisSubtask() + 1) + "> ") 
: null;
+
+   if (prefix == null) {
 
 Review comment:
   Don't use nulls here. In this case we can easily use empty string for the 
same purpose and it will be safer (no possible NPE).
   
   Btw, rephrasing this logic like that:
   ```
   completedPrefix = sinkIdentifier;
   
   if (context.getNumberOfParallelSubtasks() > 1)) {
 if (!completedPrefix.isEmpty()) {
   completedPrefix += ":";
 }
 completedPrefix += (context.getIndexOfThisSubtask() + 1);
   }
   
   if (!completedPrefix.isEmpty()) {
 completedPrefix += ">";
   }
   ```
   (optionally with ternary operator instead of some 'if' statements - that's 
only a matter of taste) simplifies the logic and deduplicate some of the 
code/constants.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576309#comment-16576309
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209233947
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -40,6 +40,8 @@
private boolean target;
private transient PrintStream stream;
private transient String prefix;
+   private String sinkIdentifier;
+   private transient String completedPrefix;
 
 Review comment:
   Please drop `prefix` field, it's only a local variable now. Also please 
change `target` and `sinkIdentifier` into `final` fields.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576313#comment-16576313
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209239877
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception {
stream.close();
}
 
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   PrintStream stream = new PrintStream(baos);
+   System.setOut(stream);
+
+   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
+   Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2);
+   Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1);
+
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>(false, "mySink");
+   printSink.setRuntimeContext(ctx);
+   try {
+   printSink.open(new Configuration());
+   } catch (Exception e) {
+   Assert.fail();
 
 Review comment:
   Do not hide the original exception. From what I've heard there was some bug 
with old junit version and that's why this pattern is reoccurring in Flink 
tests. Regardless if that was the case, it's not the problem anymore, and 
hiding original exception makes test failures harder to read/understand. (ditto 
in rest of the file as well).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576312#comment-16576312
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209238315
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -89,6 +116,8 @@ public void invoke(IN record) {
public void close() {
this.stream = null;
this.prefix = null;
+   this.sinkIdentifier = null;
+   this.completedPrefix = null;
 
 Review comment:
   Please drop the this `close` method. It doesn't do anything beside causing 
`NPE`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576307#comment-16576307
 ] 

ASF GitHub Bot commented on FLINK-9850:
---

pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209239178
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception {
stream.close();
}
 
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   PrintStream stream = new PrintStream(baos);
+   System.setOut(stream);
+
+   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
 
 Review comment:
   Do not use mockito for such things, it's very difficult to maintain such 
tests in the future. Instead please move 
`org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseTest.MockRuntimeContext`
 class to `flink-streaming-java` module to some utility package and reuse it in 
this whole `PrintSinkFunctionTest`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a string to the print method to identify output for DataStream
> --
>
> Key: FLINK-9850
> URL: https://issues.apache.org/jira/browse/FLINK-9850
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The output of the print method of {[DataSet}} allows the user to supply a 
> String to identify the output(see 
> [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But 
> {[DataStream}} doesn't support now. It is valuable to add this feature for 
> {{DataStream}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209233947
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -40,6 +40,8 @@
private boolean target;
private transient PrintStream stream;
private transient String prefix;
+   private String sinkIdentifier;
+   private transient String completedPrefix;
 
 Review comment:
   Please drop `prefix` field, it's only a local variable now. Also please 
change `target` and `sinkIdentifier` into `final` fields.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209234301
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -55,6 +57,17 @@ public PrintSinkFunction(boolean stdErr) {
target = stdErr;
}
 
+   /**
+* Instantiates a print sink function that prints to standard out and 
gives a sink identifier.
+*
+* @param stdErr True, if the format should print to standard error 
instead of standard out.
+* @param sinkIdentifier Message that identify sink and is prefixed to 
the output of the value
+*/
+   public PrintSinkFunction(boolean stdErr, String sinkIdentifier) {
+   this(stdErr);
+   this.sinkIdentifier = sinkIdentifier;
 
 Review comment:
   Usually less detailed constructor is calling the more specific ones, not the 
other way around. Here it will allow you to mark fields as `final`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209239877
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception {
stream.close();
}
 
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   PrintStream stream = new PrintStream(baos);
+   System.setOut(stream);
+
+   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
+   Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2);
+   Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1);
+
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>(false, "mySink");
+   printSink.setRuntimeContext(ctx);
+   try {
+   printSink.open(new Configuration());
+   } catch (Exception e) {
+   Assert.fail();
 
 Review comment:
   Do not hide the original exception. From what I've heard there was some bug 
with old junit version and that's why this pattern is reoccurring in Flink 
tests. Regardless if that was the case, it's not the problem anymore, and 
hiding original exception makes test failures harder to read/understand. (ditto 
in rest of the file as well).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209260792
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception {
stream.close();
}
 
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   PrintStream stream = new PrintStream(baos);
+   System.setOut(stream);
+
+   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
+   Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2);
+   Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1);
+
+   PrintSinkFunction printSink = new 
PrintSinkFunction<>(false, "mySink");
+   printSink.setRuntimeContext(ctx);
+   try {
+   printSink.open(new Configuration());
+   } catch (Exception e) {
+   Assert.fail();
+   }
+   printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
+
+   assertEquals("Print to System.out", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, baos.toString());
+
+   printSink.setTargetToStandardErr();
+   assertEquals("Print to System.err", printSink.toString());
+   assertEquals("mySink:2> hello world!" + line, baos.toString());
+
+   printSink.close();
+   stream.close();
+   }
+
+   @Test
+   public void testPrintSinkWithIdentifierButNoPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
 Review comment:
   There is quite a lot of code duplication in those tests and they were 
unnecessarily using mockito instead of proper mock. Also there was even a bug 
in `testPrintSinkStdErr`. I have fixed those issues in a hotifx: 
https://github.com/apache/flink/pull/6538
   
   I would the hotifx to be merged before this PR and please adapt/rewrite your 
test in similar fashion as I did in my hotfix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209237840
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws 
Exception {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
 
+   /**
+* Four possible format options:
+*  sinkId:taskId> output  <- sink id provided, parallelism 
> 1
+*  sinkId> output <- sink id provided, parallelism 
== 1
+*  taskId> output <- no sink id provided, 
parallelism > 1
+*  output <- no sink id provided, 
parallelism == 1
+*/
+
// set the prefix if we have a >1 parallelism
prefix = (context.getNumberOfParallelSubtasks() > 1) ?
((context.getIndexOfThisSubtask() + 1) + "> ") 
: null;
+
+   if (prefix == null) {
 
 Review comment:
   Don't use nulls here. In this case we can easily use empty string for the 
same purpose and it will be safer (no possible NPE).
   
   Btw, rephrasing this logic like that:
   ```
   completedPrefix = sinkIdentifier;
   
   if (context.getNumberOfParallelSubtasks() > 1)) {
 if (!completedPrefix.isEmpty()) {
   completedPrefix += ":";
 }
 completedPrefix += (context.getIndexOfThisSubtask() + 1);
   }
   
   if (!completedPrefix.isEmpty()) {
 completedPrefix += ">";
   }
   ```
   (optionally with ternary operator instead of some 'if' statements - that's 
only a matter of taste) simplifies the logic and deduplicate some of the 
code/constants.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209234901
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws 
Exception {
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
 
+   /**
 
 Review comment:
   please move this to class's java doc. Also replace `sinkId`, `sink id` with 
`{@code sinkIdentifier} `.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209238315
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java
 ##
 @@ -89,6 +116,8 @@ public void invoke(IN record) {
public void close() {
this.stream = null;
this.prefix = null;
+   this.sinkIdentifier = null;
+   this.completedPrefix = null;
 
 Review comment:
   Please drop the this `close` method. It doesn't do anything beside causing 
`NPE`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream

2018-08-10 Thread GitBox
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a 
string to the print method to identify output for DataStream
URL: https://github.com/apache/flink/pull/6367#discussion_r209239178
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 ##
 @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception {
stream.close();
}
 
+   @Test
+   public void testPrintSinkWithIdentifierAndPrefix() throws Exception {
+   ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   PrintStream stream = new PrintStream(baos);
+   System.setOut(stream);
+
+   final StreamingRuntimeContext ctx = 
Mockito.mock(StreamingRuntimeContext.class);
 
 Review comment:
   Do not use mockito for such things, it's very difficult to maintain such 
tests in the future. Instead please move 
`org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseTest.MockRuntimeContext`
 class to `flink-streaming-java` module to some utility package and reuse it in 
this whole `PrintSinkFunctionTest`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576269#comment-16576269
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209251987
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -204,227 +133,78 @@ public NodeId put(
 * @throws Exception Thrown if the system cannot access the state.
 */
public boolean isEmpty() throws Exception {
-   return Iterables.isEmpty(eventsBuffer.keys());
+   return Iterables.isEmpty(eventsBuffer.keys()) && 
Iterables.isEmpty(eventsBufferCache.keySet());
}
 
/**
-* Returns all elements from the previous relation starting at the 
given entry.
-*
-* @param nodeId  id of the starting entry
-* @param version Version of the previous relation which shall be 
extracted
-* @return Collection of previous relations starting with the given 
value
-* @throws Exception Thrown if the system cannot access the state.
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
 */
-   public List>> extractPatterns(
-   final NodeId nodeId,
-   final DeweyNumber version) throws Exception {
-
-   List>> result = new ArrayList<>();
-
-   // stack to remember the current extraction states
-   Stack extractionStates = new Stack<>();
-
-   // get the starting shared buffer entry for the previous 
relation
-   Lockable entryLock = entries.get(nodeId);
-
-   if (entryLock != null) {
-   SharedBufferNode entry = entryLock.getElement();
-   extractionStates.add(new 
ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
-
-   // use a depth first search to reconstruct the previous 
relations
-   while (!extractionStates.isEmpty()) {
-   final ExtractionState extractionState = 
extractionStates.pop();
-   // current path of the depth first search
-   final Stack> 
currentPath = extractionState.getPath();
-   final Tuple2 
currentEntry = extractionState.getEntry();
-
-   // termination criterion
-   if (currentEntry == null) {
-   final Map> 
completePath = new LinkedHashMap<>();
-
-   while (!currentPath.isEmpty()) {
-   final NodeId currentPathEntry = 
currentPath.pop().f0;
-
-   String page = 
currentPathEntry.getPageName();
-   List values = 
completePath
-   .computeIfAbsent(page, 
k -> new ArrayList<>());
-   
values.add(currentPathEntry.getEventId());
-   }
-   result.add(completePath);
-   } else {
-
-   // append state to the path
-   currentPath.push(currentEntry);
-
-   boolean firstMatch = true;
-   for (SharedBufferEdge edge : 
currentEntry.f1.getEdges()) {
-   // we can only proceed if the 
current version is compatible to the version
-   // of this previous relation
-   final DeweyNumber 
currentVersion = extractionState.getVersion();
-   if 
(currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
-   final NodeId target = 
edge.getTarget();
-   Stack> newPath;
-
-   if (firstMatch) {
-   // for the 
first match we don't have to copy the current path
-   newPath = 
currentPath;
-   firstMatch = 
false;
-   } else {
-   

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576268#comment-16576268
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209249252
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -204,227 +133,78 @@ public NodeId put(
 * @throws Exception Thrown if the system cannot access the state.
 */
public boolean isEmpty() throws Exception {
-   return Iterables.isEmpty(eventsBuffer.keys());
+   return Iterables.isEmpty(eventsBuffer.keys()) && 
Iterables.isEmpty(eventsBufferCache.keySet());
 
 Review comment:
   Check the cache first. In case there is sth in the cache we won't need to 
access the state.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576254#comment-16576254
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209234513
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
+   private SharedBuffer sharedBuffer;
+
+   public SharedBufferAccessor(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   public void setSharedBuffer(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   /**
+* Notifies shared buffer that there will be no events with timestamp 
 the given value. It allows to clear
+* internal counters for number of events seen so far per timestamp.
+*
+* @param timestamp watermark, no earlier events will arrive
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void advanceTime(long timestamp) throws Exception {
+   sharedBuffer.advanceTime(timestamp);
+   }
+
+   /**
+* Adds another unique event to the shared buffer and assigns a unique 
id for it. It automatically creates a
+* lock on this event, so it won't be removed during processing of that 
event. Therefore the lock should be removed
+* after processing all {@link 
org.apache.flink.cep.nfa.ComputationState}s
+*
+* NOTE:Should be called only once for each unique event!
+*
+* @param value event to be registered
+* @return unique id of that event that should be used when putting 
entries to the buffer.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public EventId registerEvent(V value, long timestamp) throws Exception {
+   return sharedBuffer.registerEvent(value, timestamp);
+   }
+
+   /**
+* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
+* relation to the previous entry.
+*
+* @param stateName  name of the state that the 

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576262#comment-16576262
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209248594
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -204,227 +133,78 @@ public NodeId put(
 * @throws Exception Thrown if the system cannot access the state.
 */
public boolean isEmpty() throws Exception {
-   return Iterables.isEmpty(eventsBuffer.keys());
+   return Iterables.isEmpty(eventsBuffer.keys()) && 
Iterables.isEmpty(eventsBufferCache.keySet());
}
 
/**
-* Returns all elements from the previous relation starting at the 
given entry.
-*
-* @param nodeId  id of the starting entry
-* @param version Version of the previous relation which shall be 
extracted
-* @return Collection of previous relations starting with the given 
value
-* @throws Exception Thrown if the system cannot access the state.
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
 */
-   public List>> extractPatterns(
-   final NodeId nodeId,
-   final DeweyNumber version) throws Exception {
-
-   List>> result = new ArrayList<>();
-
-   // stack to remember the current extraction states
-   Stack extractionStates = new Stack<>();
-
-   // get the starting shared buffer entry for the previous 
relation
-   Lockable entryLock = entries.get(nodeId);
-
-   if (entryLock != null) {
-   SharedBufferNode entry = entryLock.getElement();
-   extractionStates.add(new 
ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
-
-   // use a depth first search to reconstruct the previous 
relations
-   while (!extractionStates.isEmpty()) {
-   final ExtractionState extractionState = 
extractionStates.pop();
-   // current path of the depth first search
-   final Stack> 
currentPath = extractionState.getPath();
-   final Tuple2 
currentEntry = extractionState.getEntry();
-
-   // termination criterion
-   if (currentEntry == null) {
-   final Map> 
completePath = new LinkedHashMap<>();
-
-   while (!currentPath.isEmpty()) {
-   final NodeId currentPathEntry = 
currentPath.pop().f0;
-
-   String page = 
currentPathEntry.getPageName();
-   List values = 
completePath
-   .computeIfAbsent(page, 
k -> new ArrayList<>());
-   
values.add(currentPathEntry.getEventId());
-   }
-   result.add(completePath);
-   } else {
-
-   // append state to the path
-   currentPath.push(currentEntry);
-
-   boolean firstMatch = true;
-   for (SharedBufferEdge edge : 
currentEntry.f1.getEdges()) {
-   // we can only proceed if the 
current version is compatible to the version
-   // of this previous relation
-   final DeweyNumber 
currentVersion = extractionState.getVersion();
-   if 
(currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
-   final NodeId target = 
edge.getTarget();
-   Stack> newPath;
-
-   if (firstMatch) {
-   // for the 
first match we don't have to copy the current path
-   newPath = 
currentPath;
-   firstMatch = 
false;
-   } else {
-   

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576266#comment-16576266
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209234729
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
+   private SharedBuffer sharedBuffer;
+
+   public SharedBufferAccessor(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   public void setSharedBuffer(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   /**
+* Notifies shared buffer that there will be no events with timestamp 
 the given value. It allows to clear
+* internal counters for number of events seen so far per timestamp.
+*
+* @param timestamp watermark, no earlier events will arrive
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void advanceTime(long timestamp) throws Exception {
+   sharedBuffer.advanceTime(timestamp);
+   }
+
+   /**
+* Adds another unique event to the shared buffer and assigns a unique 
id for it. It automatically creates a
+* lock on this event, so it won't be removed during processing of that 
event. Therefore the lock should be removed
+* after processing all {@link 
org.apache.flink.cep.nfa.ComputationState}s
+*
+* NOTE:Should be called only once for each unique event!
+*
+* @param value event to be registered
+* @return unique id of that event that should be used when putting 
entries to the buffer.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public EventId registerEvent(V value, long timestamp) throws Exception {
+   return sharedBuffer.registerEvent(value, timestamp);
+   }
+
+   /**
+* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
+* relation to the previous entry.
+*
+* @param stateName  name of the state that the 

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576252#comment-16576252
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209233907
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
+   private SharedBuffer sharedBuffer;
+
+   public SharedBufferAccessor(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   public void setSharedBuffer(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   /**
+* Notifies shared buffer that there will be no events with timestamp 
 the given value. It allows to clear
+* internal counters for number of events seen so far per timestamp.
+*
+* @param timestamp watermark, no earlier events will arrive
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void advanceTime(long timestamp) throws Exception {
+   sharedBuffer.advanceTime(timestamp);
+   }
+
+   /**
+* Adds another unique event to the shared buffer and assigns a unique 
id for it. It automatically creates a
+* lock on this event, so it won't be removed during processing of that 
event. Therefore the lock should be removed
+* after processing all {@link 
org.apache.flink.cep.nfa.ComputationState}s
+*
+* NOTE:Should be called only once for each unique event!
+*
+* @param value event to be registered
+* @return unique id of that event that should be used when putting 
entries to the buffer.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public EventId registerEvent(V value, long timestamp) throws Exception {
+   return sharedBuffer.registerEvent(value, timestamp);
+   }
+
+   /**
+* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
+* relation to the previous entry.
+*
+* @param stateName  name of the state that the 

  1   2   3   >