[jira] [Commented] (FLINK-10123) Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
[ https://issues.apache.org/jira/browse/FLINK-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577019#comment-16577019 ] ASF GitHub Bot commented on FLINK-10123: yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client URL: https://github.com/apache/flink/pull/6539#issuecomment-412244963 Till, scala checkstyle error : ``` error file=/home/travis/build/apache/flink/flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala message=File line length exceeds 100 characters line=88 ``` other travis build task also failed, because of some connection timeout. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client > -- > > Key: FLINK-10123 > URL: https://issues.apache.org/jira/browse/FLINK-10123 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Instead of using the {{DefaultThreadFactory}} in the > {{RestServerEndpoint}}/{{RestClient}} we should use the > {{ExecutorThreadFactory}} because it uses the {{FatalExitExceptionHandler}} > per default as the uncaught exception handler. This should guard against > uncaught exceptions by simply terminating the JVM. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client URL: https://github.com/apache/flink/pull/6539#issuecomment-412244963 Till, scala checkstyle error : ``` error file=/home/travis/build/apache/flink/flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala message=File line length exceeds 100 characters line=88 ``` other travis build task also failed, because of some connection timeout. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10125) Unclosed ByteArrayDataOutputView in RocksDBMapState
[ https://issues.apache.org/jira/browse/FLINK-10125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10125: Assignee: vinoyang > Unclosed ByteArrayDataOutputView in RocksDBMapState > --- > > Key: FLINK-10125 > URL: https://issues.apache.org/jira/browse/FLINK-10125 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > {code} > ByteArrayDataOutputView dov = new ByteArrayDataOutputView(1); > {code} > dov is used in a try block but it is not closed in case of Exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577011#comment-16577011 ] ASF GitHub Bot commented on FLINK-5315: --- hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r209413346 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala ## @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends UserDefinedFunction { * accumulator type should be automatically inferred. */ def getAccumulatorType: TypeInformation[ACC] = null + + private[flink] var isDistinctAgg: Boolean = false + + private[flink] def distinct: AggregateFunction[T, ACC] = { Review comment: good catch! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > > Support distinct aggregations in Table API in the following format: > For Expressions: > {code:scala} > 'a.count.distinct // Expressions distinct modifier > {code} > For User-defined Function: > {code:scala} > singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier > multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r209413346 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala ## @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends UserDefinedFunction { * accumulator type should be automatically inferred. */ def getAccumulatorType: TypeInformation[ACC] = null + + private[flink] var isDistinctAgg: Boolean = false + + private[flink] def distinct: AggregateFunction[T, ACC] = { Review comment: good catch! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10125) Unclosed ByteArrayDataOutputView in RocksDBMapState
Ted Yu created FLINK-10125: -- Summary: Unclosed ByteArrayDataOutputView in RocksDBMapState Key: FLINK-10125 URL: https://issues.apache.org/jira/browse/FLINK-10125 Project: Flink Issue Type: Bug Reporter: Ted Yu {code} ByteArrayDataOutputView dov = new ByteArrayDataOutputView(1); {code} dov is used in a try block but it is not closed in case of Exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes
[ https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-10041. -- Resolution: Implemented Fix Version/s: 1.7.0 Merged in: master: 9d67afbb84 > Extract all different iterators (inner or static inner classes) into full > classes > - > > Key: FLINK-10041 > URL: https://issues.apache.org/jira/browse/FLINK-10041 > Project: Flink > Issue Type: Sub-task >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10124) Use ByteArrayDataInput/OutputView instead of stream + wrapper
[ https://issues.apache.org/jira/browse/FLINK-10124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-10124. -- Resolution: Implemented Fix Version/s: 1.7.0 Merged in: master: 18ff4ab > Use ByteArrayDataInput/OutputView instead of stream + wrapper > - > > Key: FLINK-10124 > URL: https://issues.apache.org/jira/browse/FLINK-10124 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes
[ https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576664#comment-16576664 ] ASF GitHub Bot commented on FLINK-10041: asfgit closed pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java index 33836f0c781..698a9f97dc0 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java @@ -53,4 +53,8 @@ public void setPosition(int pos) { public void setData(@Nonnull byte[] buffer, int offset, int length) { inStreamWithPos.setBuffer(buffer, offset, length); } + + public void setData(@Nonnull byte[] buffer) { + setData(buffer, 0, buffer.length); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java index 32819f84e46..2a9ab7589a9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java @@ -20,8 +20,6 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.runtime.state.internal.InternalAppendingState; import org.apache.flink.util.FlinkRuntimeException; @@ -63,7 +61,8 @@ SV getInternal(byte[] key) { if (valueBytes == null) { return null; } - return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); + dataInputView.setData(valueBytes); + return valueSerializer.deserialize(dataInputView); } catch (IOException | RocksDBException e) { throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 7483089106f..65b7f1fa4a7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -20,9 +20,8 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.ByteArrayDataInputView; +import org.apache.flink.core.memory.ByteArrayDataOutputView; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.internal.InternalKvState; @@ -67,9 +66,9 @@ protected final WriteOptions writeOptions; - protected final ByteArrayOutputStreamWithPos keySerializationStream; + protected final ByteArrayDataOutputView dataOutputView; - protected final DataOutputView keySerializationDataOutputView; + protected final ByteArrayDataInputView dataInputView; private final boolean ambiguousKeyPossible; @@ -98,9 +97,10 @@ protected AbstractRocksDBState( this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "State value serializer"); this.defaultValue = defaultValue; - this.keySerializationStream = new
[GitHub] asfgit closed pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …
asfgit closed pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java index 33836f0c781..698a9f97dc0 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java @@ -53,4 +53,8 @@ public void setPosition(int pos) { public void setData(@Nonnull byte[] buffer, int offset, int length) { inStreamWithPos.setBuffer(buffer, offset, length); } + + public void setData(@Nonnull byte[] buffer) { + setData(buffer, 0, buffer.length); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java index 32819f84e46..2a9ab7589a9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java @@ -20,8 +20,6 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.runtime.state.internal.InternalAppendingState; import org.apache.flink.util.FlinkRuntimeException; @@ -63,7 +61,8 @@ SV getInternal(byte[] key) { if (valueBytes == null) { return null; } - return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); + dataInputView.setData(valueBytes); + return valueSerializer.deserialize(dataInputView); } catch (IOException | RocksDBException e) { throw new FlinkRuntimeException("Error while retrieving data from RocksDB", e); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 7483089106f..65b7f1fa4a7 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -20,9 +20,8 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.ByteArrayDataInputView; +import org.apache.flink.core.memory.ByteArrayDataOutputView; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.internal.InternalKvState; @@ -67,9 +66,9 @@ protected final WriteOptions writeOptions; - protected final ByteArrayOutputStreamWithPos keySerializationStream; + protected final ByteArrayDataOutputView dataOutputView; - protected final DataOutputView keySerializationDataOutputView; + protected final ByteArrayDataInputView dataInputView; private final boolean ambiguousKeyPossible; @@ -98,9 +97,10 @@ protected AbstractRocksDBState( this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "State value serializer"); this.defaultValue = defaultValue; - this.keySerializationStream = new ByteArrayOutputStreamWithPos(128); - this.keySerializationDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream); - this.ambiguousKeyPossible =
[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes
[ https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576660#comment-16576660 ] ASF GitHub Bot commented on FLINK-10041: StefanRRichter commented on issue #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#issuecomment-412162109 @bowenli86 @azagrebin thanks for the reviews. Merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract all different iterators (inner or static inner classes) into full > classes > - > > Key: FLINK-10041 > URL: https://issues.apache.org/jira/browse/FLINK-10041 > Project: Flink > Issue Type: Sub-task >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on issue #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …
StefanRRichter commented on issue #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#issuecomment-412162109 @bowenli86 @azagrebin thanks for the reviews. Merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10124) Use ByteArrayDataInput/OutputView instead of stream + wrapper
Stefan Richter created FLINK-10124: -- Summary: Use ByteArrayDataInput/OutputView instead of stream + wrapper Key: FLINK-10124 URL: https://issues.apache.org/jira/browse/FLINK-10124 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing Reporter: Stefan Richter Assignee: Stefan Richter -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes
[ https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576601#comment-16576601 ] ASF GitHub Bot commented on FLINK-10041: azagrebin commented on issue #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#issuecomment-412147803 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract all different iterators (inner or static inner classes) into full > classes > - > > Key: FLINK-10041 > URL: https://issues.apache.org/jira/browse/FLINK-10041 > Project: Flink > Issue Type: Sub-task >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on issue #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …
azagrebin commented on issue #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#issuecomment-412147803 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10074) Allowable number of checkpoint failures
[ https://issues.apache.org/jira/browse/FLINK-10074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576588#comment-16576588 ] vinoyang commented on FLINK-10074: -- [~trohrm...@apache.org] need more discussion? if no, I will start this work. > Allowable number of checkpoint failures > > > Key: FLINK-10074 > URL: https://issues.apache.org/jira/browse/FLINK-10074 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Thomas Weise >Assignee: vinoyang >Priority: Major > > For intermittent checkpoint failures it is desirable to have a mechanism to > avoid restarts. If, for example, a transient S3 error prevents checkpoint > completion, the next checkpoint may very well succeed. The user may wish to > not incur the expense of restart under such scenario and this could be > expressed with a failure threshold (number of subsequent checkpoint > failures), possibly combined with a list of exceptions to tolerate. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10048) Migrate module flink-gelly-examples to flink-examples and rename it
[ https://issues.apache.org/jira/browse/FLINK-10048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576578#comment-16576578 ] vinoyang commented on FLINK-10048: -- [~trohrm...@apache.org] any opinion? if not I will start this work. > Migrate module flink-gelly-examples to flink-examples and rename it > --- > > Key: FLINK-10048 > URL: https://issues.apache.org/jira/browse/FLINK-10048 > Project: Flink > Issue Type: Improvement >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > I think we can put all the example modules into flink-examples module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576573#comment-16576573 ] ASF GitHub Bot commented on FLINK-9850: --- yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412144209 Hi @pnowojski thanks for your suggestion, I have refactored this PR, we both changed the `PrintSinkFunctionTest` so maybe I did not finish all of the works. What's more, I agree with the opinion about "deduplicate the logic of PrintSinkFunction and PrintingOutputFormat", I'd like to start this work, if current issue would be fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
yanghua commented on issue #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#issuecomment-412144209 Hi @pnowojski thanks for your suggestion, I have refactored this PR, we both changed the `PrintSinkFunctionTest` so maybe I did not finish all of the works. What's more, I agree with the opinion about "deduplicate the logic of PrintSinkFunction and PrintingOutputFormat", I'd like to start this work, if current issue would be fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-5232) Add a Thread default uncaught exception handler on the JobManager
[ https://issues.apache.org/jira/browse/FLINK-5232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576557#comment-16576557 ] ASF GitHub Bot commented on FLINK-5232: --- yanghua commented on issue #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager URL: https://github.com/apache/flink/pull/6334#issuecomment-412140577 so, I close this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a Thread default uncaught exception handler on the JobManager > - > > Key: FLINK-5232 > URL: https://issues.apache.org/jira/browse/FLINK-5232 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > When some JobManager threads die because of uncaught exceptions, we should > bring down the JobManager. If a thread dies from an uncaught exception, there > is a high chance that the JobManager becomes dysfunctional. > The only sfae thing is to rely on the JobManager being restarted by YARN / > Mesos / Kubernetes / etc. > I suggest to add this code to the JobManager launch: > {code} > Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { > @Override > public void uncaughtException(Thread t, Throwable e) { > try { > LOG.error("Thread {} died due to an uncaught exception. Killing > process.", t.getName()); > } finally { > Runtime.getRuntime().halt(-1); > } > } > }); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5232) Add a Thread default uncaught exception handler on the JobManager
[ https://issues.apache.org/jira/browse/FLINK-5232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576556#comment-16576556 ] ASF GitHub Bot commented on FLINK-5232: --- yanghua commented on issue #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager URL: https://github.com/apache/flink/pull/6334#issuecomment-412140446 @tillrohrmann good job, thanks for helping me to extend it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a Thread default uncaught exception handler on the JobManager > - > > Key: FLINK-5232 > URL: https://issues.apache.org/jira/browse/FLINK-5232 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > When some JobManager threads die because of uncaught exceptions, we should > bring down the JobManager. If a thread dies from an uncaught exception, there > is a high chance that the JobManager becomes dysfunctional. > The only sfae thing is to rely on the JobManager being restarted by YARN / > Mesos / Kubernetes / etc. > I suggest to add this code to the JobManager launch: > {code} > Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { > @Override > public void uncaughtException(Thread t, Throwable e) { > try { > LOG.error("Thread {} died due to an uncaught exception. Killing > process.", t.getName()); > } finally { > Runtime.getRuntime().halt(-1); > } > } > }); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager
yanghua commented on issue #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager URL: https://github.com/apache/flink/pull/6334#issuecomment-412140577 so, I close this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager
yanghua commented on issue #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager URL: https://github.com/apache/flink/pull/6334#issuecomment-412140446 @tillrohrmann good job, thanks for helping me to extend it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10123) Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
[ https://issues.apache.org/jira/browse/FLINK-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576554#comment-16576554 ] ASF GitHub Bot commented on FLINK-10123: yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client URL: https://github.com/apache/flink/pull/6539#issuecomment-412139875 +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client > -- > > Key: FLINK-10123 > URL: https://issues.apache.org/jira/browse/FLINK-10123 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Instead of using the {{DefaultThreadFactory}} in the > {{RestServerEndpoint}}/{{RestClient}} we should use the > {{ExecutorThreadFactory}} because it uses the {{FatalExitExceptionHandler}} > per default as the uncaught exception handler. This should guard against > uncaught exceptions by simply terminating the JVM. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
yanghua commented on issue #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client URL: https://github.com/apache/flink/pull/6539#issuecomment-412139875 +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576527#comment-16576527 ] ASF GitHub Bot commented on FLINK-6968: --- xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink. URL: https://github.com/apache/flink/pull/6434#discussion_r209314950 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sink/queryable/QueryableTableSinkTest.scala ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.sink.queryable + +import java.time.Duration +import java.util.concurrent.{ExecutionException, TimeUnit} + +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.common.time.{Deadline, Time} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.scala._ +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend +import org.apache.flink.queryablestate.client.QueryableStateClient +import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException +import org.apache.flink.runtime.state.StateBackend +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} +import org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig +import org.apache.flink.table.sinks.queryable.QueryableTableSink +import org.apache.flink.types.Row +import org.hamcrest.core.Is +import org.junit.Assert._ +import org.junit.rules.{ExpectedException, TemporaryFolder} +import org.junit.{Rule, Test} + + +class QueryableTableSinkTest extends QueryableSinkTestBase { + + private val queryConfig = new StreamQueryConfig() + queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + + val _tempFolder = new TemporaryFolder + @Rule + def tempFolder: TemporaryFolder = _tempFolder + + val _expectedException = ExpectedException.none() + @Rule + def expectedException: ExpectedException = _expectedException + + def getStateBackend: StateBackend = { +val dbPath = tempFolder.newFolder().getAbsolutePath +val checkpointPath = tempFolder.newFolder().toURI.toString +val backend = new RocksDBStateBackend(checkpointPath) +backend.setDbStoragePath(dbPath) +backend + } + + @Test + def testQueryableSink(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) + +//name, money +val data = List(("jeff", -1), ("dean", -2), ("jeff", 2), ("dean", 4)) +val source = new TestKVListSource[String, Int](data) + +// select name, sum(money) as sm from t group by name +val t = env.addSource(source).toTable(tEnv, 'name, 'money) +.groupBy("name") +.select("name, sum(money) as sm") + +val queryableSink = new QueryableTableSink("prefix", + new StreamQueryConfig().withIdleStateRetentionTime(Time.minutes(1), Time.minutes(7))) + +t.writeToSink(queryableSink) + +val clusterClient = QueryableSinkTestBase.miniClusterResource.getClusterClient +val deadline = Deadline.now.plus(Duration.ofSeconds(100)) + +val autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, env.getJavaEnv) +val client = new QueryableStateClient("localhost", 9084) + Review comment: Required shutdown client in the end This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 >
[GitHub] xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink.
xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink. URL: https://github.com/apache/flink/pull/6434#discussion_r209314950 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sink/queryable/QueryableTableSinkTest.scala ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.sink.queryable + +import java.time.Duration +import java.util.concurrent.{ExecutionException, TimeUnit} + +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.common.time.{Deadline, Time} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.scala._ +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend +import org.apache.flink.queryablestate.client.QueryableStateClient +import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException +import org.apache.flink.runtime.state.StateBackend +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} +import org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig +import org.apache.flink.table.sinks.queryable.QueryableTableSink +import org.apache.flink.types.Row +import org.hamcrest.core.Is +import org.junit.Assert._ +import org.junit.rules.{ExpectedException, TemporaryFolder} +import org.junit.{Rule, Test} + + +class QueryableTableSinkTest extends QueryableSinkTestBase { + + private val queryConfig = new StreamQueryConfig() + queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + + val _tempFolder = new TemporaryFolder + @Rule + def tempFolder: TemporaryFolder = _tempFolder + + val _expectedException = ExpectedException.none() + @Rule + def expectedException: ExpectedException = _expectedException + + def getStateBackend: StateBackend = { +val dbPath = tempFolder.newFolder().getAbsolutePath +val checkpointPath = tempFolder.newFolder().toURI.toString +val backend = new RocksDBStateBackend(checkpointPath) +backend.setDbStoragePath(dbPath) +backend + } + + @Test + def testQueryableSink(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) + +//name, money +val data = List(("jeff", -1), ("dean", -2), ("jeff", 2), ("dean", 4)) +val source = new TestKVListSource[String, Int](data) + +// select name, sum(money) as sm from t group by name +val t = env.addSource(source).toTable(tEnv, 'name, 'money) +.groupBy("name") +.select("name, sum(money) as sm") + +val queryableSink = new QueryableTableSink("prefix", + new StreamQueryConfig().withIdleStateRetentionTime(Time.minutes(1), Time.minutes(7))) + +t.writeToSink(queryableSink) + +val clusterClient = QueryableSinkTestBase.miniClusterResource.getClusterClient +val deadline = Deadline.now.plus(Duration.ofSeconds(100)) + +val autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, env.getJavaEnv) +val client = new QueryableStateClient("localhost", 9084) + Review comment: Required shutdown client in the end This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576517#comment-16576517 ] ASF GitHub Bot commented on FLINK-5315: --- walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r209313630 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala ## @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends UserDefinedFunction { * accumulator type should be automatically inferred. */ def getAccumulatorType: TypeInformation[ACC] = null + + private[flink] var isDistinctAgg: Boolean = false + + private[flink] def distinct: AggregateFunction[T, ACC] = { Review comment: This is not going to work as it modifies an underlying field of this particular AggregateFunction object. For example: ``` table.select(udagg.distinct('a), udagg('a)) ``` will return the same result in both column because distinct modifier has been added to this particular `udagg` element. This is a blunder on my end and I should fixed this before further reviews can be conducted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > > Support distinct aggregations in Table API in the following format: > For Expressions: > {code:scala} > 'a.count.distinct // Expressions distinct modifier > {code} > For User-defined Function: > {code:scala} > singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier > multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
walterddr commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r209313630 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala ## @@ -136,4 +136,11 @@ abstract class AggregateFunction[T, ACC] extends UserDefinedFunction { * accumulator type should be automatically inferred. */ def getAccumulatorType: TypeInformation[ACC] = null + + private[flink] var isDistinctAgg: Boolean = false + + private[flink] def distinct: AggregateFunction[T, ACC] = { Review comment: This is not going to work as it modifies an underlying field of this particular AggregateFunction object. For example: ``` table.select(udagg.distinct('a), udagg('a)) ``` will return the same result in both column because distinct modifier has been added to this particular `udagg` element. This is a blunder on my end and I should fixed this before further reviews can be conducted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10001) Improve Kubernetes documentation
[ https://issues.apache.org/jira/browse/FLINK-10001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-10001: - Assignee: Till Rohrmann > Improve Kubernetes documentation > > > Key: FLINK-10001 > URL: https://issues.apache.org/jira/browse/FLINK-10001 > Project: Flink > Issue Type: Improvement > Components: Documentation, Kubernetes >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.6.0 > > > We should update Flink's K8s documentation. This includes running it on > {{MiniKube}} as well as on a K8s cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576452#comment-16576452 ] ASF GitHub Bot commented on FLINK-6968: --- xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink. URL: https://github.com/apache/flink/pull/6434#discussion_r209300017 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/queryable/QueryableTableSink.scala ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks.queryable + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.sinks.{TableSinkBase, UpsertStreamTableSink} +import org.apache.flink.types.Row + +/** + * A QueryableTableSink stores table in queryable state. + * + * This class stores table in queryable state so that users can access table data without + * dependency on external storage. + * + * Since this is only a kv storage, currently user can only do point query against it. + * + * Example: + * {{{ + * val env = ExecutionEnvironment.getExecutionEnvironment + * val tEnv = TableEnvironment.getTableEnvironment(env) + * + * val table: Table = ... + * + * val queryableTableSink: QueryableTableSink = new QueryableTableSink( + * "prefix", + * queryConfig) + * + * tEnv.writeToSink(table, queryableTableSink, config) + * }}} + * + * When program starts to run, user can access state with QueryableStateClient. + * {{{ + * val client = new QueryableStateClient(tmHostname, proxyPort) + * val data = client.getKvState( + * jobId, + * "prefix-column1", + * Row.of(1), + * new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id")) + * stateDescriptor) + * .get(); + * + * }}} + * + * + * @param namePrefix + * @param queryConfig + */ +class QueryableTableSink( + private val namePrefix: String, + private val queryConfig: StreamQueryConfig) + extends UpsertStreamTableSink[Row] +with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + +"tables as the table would grow infinitely") +} + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new QueryableStateProcessFunction( + namePrefix, + queryConfig, + keys, + getFieldNames, + getFieldTypes) + +dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType)) + .process(processFunction) + } + + override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = { +new QueryableTableSink(this.namePrefix, this.queryConfig) + } +} + + + + Review comment: remove block of blank lines This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Store streaming, updating tables with unique key in queryable state >
[GitHub] xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink.
xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink. URL: https://github.com/apache/flink/pull/6434#discussion_r209300017 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/queryable/QueryableTableSink.scala ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks.queryable + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.sinks.{TableSinkBase, UpsertStreamTableSink} +import org.apache.flink.types.Row + +/** + * A QueryableTableSink stores table in queryable state. + * + * This class stores table in queryable state so that users can access table data without + * dependency on external storage. + * + * Since this is only a kv storage, currently user can only do point query against it. + * + * Example: + * {{{ + * val env = ExecutionEnvironment.getExecutionEnvironment + * val tEnv = TableEnvironment.getTableEnvironment(env) + * + * val table: Table = ... + * + * val queryableTableSink: QueryableTableSink = new QueryableTableSink( + * "prefix", + * queryConfig) + * + * tEnv.writeToSink(table, queryableTableSink, config) + * }}} + * + * When program starts to run, user can access state with QueryableStateClient. + * {{{ + * val client = new QueryableStateClient(tmHostname, proxyPort) + * val data = client.getKvState( + * jobId, + * "prefix-column1", + * Row.of(1), + * new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id")) + * stateDescriptor) + * .get(); + * + * }}} + * + * + * @param namePrefix + * @param queryConfig + */ +class QueryableTableSink( + private val namePrefix: String, + private val queryConfig: StreamQueryConfig) + extends UpsertStreamTableSink[Row] +with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { +if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") +} +this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { +if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + +"tables as the table would grow infinitely") +} + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { +val keyIndices = keys.map(getFieldNames.indexOf(_)) +val keyTypes = keyIndices.map(getFieldTypes(_)) + +val keySelectorType = new RowTypeInfo(keyTypes, keys) + +val processFunction = new QueryableStateProcessFunction( + namePrefix, + queryConfig, + keys, + getFieldNames, + getFieldTypes) + +dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType)) + .process(processFunction) + } + + override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = { +new QueryableTableSink(this.namePrefix, this.queryConfig) + } +} + + + + Review comment: remove block of blank lines This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-9795) Update Mesos documentation for flip6
[ https://issues.apache.org/jira/browse/FLINK-9795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9795. -- Resolution: Fixed Fix Version/s: 1.7.0 1.6.1 Fixed via 1.7.0: a442eb6c0388558c6fb2e5e616cd1cd15038b95c 1.6.1: 45cffa17252c5fb9bc38a6b771c2a75aaa8c10ee > Update Mesos documentation for flip6 > > > Key: FLINK-9795 > URL: https://issues.apache.org/jira/browse/FLINK-9795 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.5.0, 1.6.0 >Reporter: Leonid Ishimnikov >Assignee: Gary Yao >Priority: Major > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0 > > > Mesos documentation would benefit from an overhaul after flip6 became the > default cluster management model. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #6538: [hotfix][streaming] Fix and simplify PrintSinkFunctionTest
pnowojski commented on issue #6538: [hotfix][streaming] Fix and simplify PrintSinkFunctionTest URL: https://github.com/apache/flink/pull/6538#issuecomment-412118428 Failure is still there. It seems like there is some travis failure, because many branches (including master and release-1.5) are failing with similar errors. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10105) Test failure because of jobmanager.execution.failover-strategy is outdated
[ https://issues.apache.org/jira/browse/FLINK-10105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10105: -- Fix Version/s: 1.6.1 > Test failure because of jobmanager.execution.failover-strategy is outdated > -- > > Key: FLINK-10105 > URL: https://issues.apache.org/jira/browse/FLINK-10105 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: vinoyang >Assignee: Dawid Wysakowicz >Priority: Major > Fix For: 1.6.1, 1.7.0 > > > {code:java} > Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.745 sec <<< > FAILURE! - in > org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase > testFullReferenceCompleteness(org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase) > Time elapsed: 0.693 sec <<< FAILURE! > java.lang.AssertionError: Documentation is outdated, please regenerate it > according to the instructions in flink-docs/README.md. > Problems: > Documented description of > jobmanager.execution.failover-strategy in class > org.apache.flink.configuration.JobManagerOptions is outdated. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.compareDocumentedAndExistingOptions(ConfigOptionsDocsCompletenessITCase.java:118) > at > org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.testFullReferenceCompleteness(ConfigOptionsDocsCompletenessITCase.java:76) > Results : > Failed tests: > > ConfigOptionsDocsCompletenessITCase.testFullReferenceCompleteness:76->compareDocumentedAndExistingOptions:118 > Documentation is outdated, please regenerate it according to the > instructions in flink-docs/README.md. > Problems: > Documented description of > jobmanager.execution.failover-strategy in class > org.apache.flink.configuration.JobManagerOptions is outdated. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10105) Test failure because of jobmanager.execution.failover-strategy is outdated
[ https://issues.apache.org/jira/browse/FLINK-10105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-10105. --- Resolution: Fixed 1.6.1: fbcc4965b77488aca65ae833c09b85a7f3c79272 > Test failure because of jobmanager.execution.failover-strategy is outdated > -- > > Key: FLINK-10105 > URL: https://issues.apache.org/jira/browse/FLINK-10105 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: vinoyang >Assignee: Dawid Wysakowicz >Priority: Major > Fix For: 1.6.1, 1.7.0 > > > {code:java} > Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.745 sec <<< > FAILURE! - in > org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase > testFullReferenceCompleteness(org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase) > Time elapsed: 0.693 sec <<< FAILURE! > java.lang.AssertionError: Documentation is outdated, please regenerate it > according to the instructions in flink-docs/README.md. > Problems: > Documented description of > jobmanager.execution.failover-strategy in class > org.apache.flink.configuration.JobManagerOptions is outdated. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.compareDocumentedAndExistingOptions(ConfigOptionsDocsCompletenessITCase.java:118) > at > org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.testFullReferenceCompleteness(ConfigOptionsDocsCompletenessITCase.java:76) > Results : > Failed tests: > > ConfigOptionsDocsCompletenessITCase.testFullReferenceCompleteness:76->compareDocumentedAndExistingOptions:118 > Documentation is outdated, please regenerate it according to the > instructions in flink-docs/README.md. > Problems: > Documented description of > jobmanager.execution.failover-strategy in class > org.apache.flink.configuration.JobManagerOptions is outdated. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10105) Test failure because of jobmanager.execution.failover-strategy is outdated
[ https://issues.apache.org/jira/browse/FLINK-10105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-10105: --- > Test failure because of jobmanager.execution.failover-strategy is outdated > -- > > Key: FLINK-10105 > URL: https://issues.apache.org/jira/browse/FLINK-10105 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: vinoyang >Assignee: Dawid Wysakowicz >Priority: Major > Fix For: 1.6.1, 1.7.0 > > > {code:java} > Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.745 sec <<< > FAILURE! - in > org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase > testFullReferenceCompleteness(org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase) > Time elapsed: 0.693 sec <<< FAILURE! > java.lang.AssertionError: Documentation is outdated, please regenerate it > according to the instructions in flink-docs/README.md. > Problems: > Documented description of > jobmanager.execution.failover-strategy in class > org.apache.flink.configuration.JobManagerOptions is outdated. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.compareDocumentedAndExistingOptions(ConfigOptionsDocsCompletenessITCase.java:118) > at > org.apache.flink.docs.configuration.ConfigOptionsDocsCompletenessITCase.testFullReferenceCompleteness(ConfigOptionsDocsCompletenessITCase.java:76) > Results : > Failed tests: > > ConfigOptionsDocsCompletenessITCase.testFullReferenceCompleteness:76->compareDocumentedAndExistingOptions:118 > Documentation is outdated, please regenerate it according to the > instructions in flink-docs/README.md. > Problems: > Documented description of > jobmanager.execution.failover-strategy in class > org.apache.flink.configuration.JobManagerOptions is outdated. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink.
xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink. URL: https://github.com/apache/flink/pull/6434#discussion_r209297911 ## File path: flink-libraries/flink-table/pom.xml ## @@ -186,6 +186,13 @@ under the License. test test-jar + + + org.apache.flink + flink-queryable-state-runtime_2.11 Review comment: replace hardcoded 2.11 with ${scala.binary.version} This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576440#comment-16576440 ] ASF GitHub Bot commented on FLINK-6968: --- xhumanoid commented on a change in pull request #6434: [FLINK-6968] [table] Add Queryable table sink. URL: https://github.com/apache/flink/pull/6434#discussion_r209297911 ## File path: flink-libraries/flink-table/pom.xml ## @@ -186,6 +186,13 @@ under the License. test test-jar + + + org.apache.flink + flink-queryable-state-runtime_2.11 Review comment: replace hardcoded 2.11 with ${scala.binary.version} This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5232) Add a Thread default uncaught exception handler on the JobManager
[ https://issues.apache.org/jira/browse/FLINK-5232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576436#comment-16576436 ] ASF GitHub Bot commented on FLINK-5232: --- tillrohrmann commented on issue #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager URL: https://github.com/apache/flink/pull/6334#issuecomment-412116926 Hi @yanghua, I think your approach goes in the right direction. Extending `ActorSystemImpl` allows us to set a custom uncaught exception handler. It is a bit tricky since we have to put our own `RobustActorSystem` in the same package as `ActorSystemImpl`. But I think this is ok since it only affects a single class. I've taken your work and extended it a bit in #6539 (second commit). What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a Thread default uncaught exception handler on the JobManager > - > > Key: FLINK-5232 > URL: https://issues.apache.org/jira/browse/FLINK-5232 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > When some JobManager threads die because of uncaught exceptions, we should > bring down the JobManager. If a thread dies from an uncaught exception, there > is a high chance that the JobManager becomes dysfunctional. > The only sfae thing is to rely on the JobManager being restarted by YARN / > Mesos / Kubernetes / etc. > I suggest to add this code to the JobManager launch: > {code} > Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { > @Override > public void uncaughtException(Thread t, Throwable e) { > try { > LOG.error("Thread {} died due to an uncaught exception. Killing > process.", t.getName()); > } finally { > Runtime.getRuntime().halt(-1); > } > } > }); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on issue #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager
tillrohrmann commented on issue #6334: [FLINK-5232] Add a Thread default uncaught exception handler on the JobManager URL: https://github.com/apache/flink/pull/6334#issuecomment-412116926 Hi @yanghua, I think your approach goes in the right direction. Extending `ActorSystemImpl` allows us to set a custom uncaught exception handler. It is a bit tricky since we have to put our own `RobustActorSystem` in the same package as `ActorSystemImpl`. But I think this is ok since it only affects a single class. I've taken your work and extended it a bit in #6539 (second commit). What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6538: [hotfix][streaming] Fix and simplify PrintSinkFunctionTest
pnowojski commented on a change in pull request #6538: [hotfix][streaming] Fix and simplify PrintSinkFunctionTest URL: https://github.com/apache/flink/pull/6538#discussion_r209296461 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ## @@ -20,106 +20,84 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; import org.junit.After; -import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import static org.junit.Assert.assertEquals; /** - * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}. + * Tests for the {@link PrintSinkFunction}. */ public class PrintSinkFunctionTest { - public PrintStream printStreamOriginal = System.out; - private String line = System.lineSeparator(); + private final PrintStream originalSystemOut = System.out; + private final PrintStream originalSystemErr = System.err; - @Test - public void testPrintSinkStdOut() throws Exception { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream stream = new PrintStream(baos); - System.setOut(stream); + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + private final ByteArrayOutputStream arrayErrorStream = new ByteArrayOutputStream(); - final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class); + private final String line = System.lineSeparator(); - PrintSinkFunction printSink = new PrintSinkFunction<>(); - printSink.setRuntimeContext(ctx); - try { - printSink.open(new Configuration()); - } catch (Exception e) { - Assert.fail(); + @Before + public void setUp() { + System.setOut(new PrintStream(arrayOutputStream)); + System.setErr(new PrintStream(arrayErrorStream)); + } + + @After + public void tearDown() { + if (System.out != originalSystemOut) { + System.out.close(); } + if (System.err != originalSystemErr) { + System.err.close(); + } + System.setOut(originalSystemOut); + System.setErr(originalSystemErr); + } + + @Test + public void testPrintSinkStdOut() throws Exception { + PrintSinkFunction printSink = new PrintSinkFunction<>(); + printSink.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 0)); printSink.setTargetToStandardOut(); + printSink.open(new Configuration()); + printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); assertEquals("Print to System.out", printSink.toString()); - assertEquals("hello world!" + line, baos.toString()); - - printSink.close(); Review comment: Btw, is there a reason why `RichFunction` doesn't implement `AutoClosable`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10123) Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
[ https://issues.apache.org/jira/browse/FLINK-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576433#comment-16576433 ] ASF GitHub Bot commented on FLINK-10123: tillrohrmann opened a new pull request #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client URL: https://github.com/apache/flink/pull/6539 ## What is the purpose of the change Using the ExecutorThreadFactory hardens the system because it uses the FatalExitExceptionHandler as UncaughtExceptionHandler which terminates the JVM in case of an exception. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client > -- > > Key: FLINK-10123 > URL: https://issues.apache.org/jira/browse/FLINK-10123 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Instead of using the {{DefaultThreadFactory}} in the > {{RestServerEndpoint}}/{{RestClient}} we should use the > {{ExecutorThreadFactory}} because it uses the {{FatalExitExceptionHandler}} > per default as the uncaught exception handler. This should guard against > uncaught exceptions by simply terminating the JVM. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann opened a new pull request #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
tillrohrmann opened a new pull request #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client URL: https://github.com/apache/flink/pull/6539 ## What is the purpose of the change Using the ExecutorThreadFactory hardens the system because it uses the FatalExitExceptionHandler as UncaughtExceptionHandler which terminates the JVM in case of an exception. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10123) Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
[ https://issues.apache.org/jira/browse/FLINK-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10123: --- Labels: pull-request-available (was: ) > Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client > -- > > Key: FLINK-10123 > URL: https://issues.apache.org/jira/browse/FLINK-10123 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Instead of using the {{DefaultThreadFactory}} in the > {{RestServerEndpoint}}/{{RestClient}} we should use the > {{ExecutorThreadFactory}} because it uses the {{FatalExitExceptionHandler}} > per default as the uncaught exception handler. This should guard against > uncaught exceptions by simply terminating the JVM. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10123) Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
Till Rohrmann created FLINK-10123: - Summary: Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client Key: FLINK-10123 URL: https://issues.apache.org/jira/browse/FLINK-10123 Project: Flink Issue Type: Improvement Components: REST Affects Versions: 1.6.0, 1.7.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.7.0 Instead of using the {{DefaultThreadFactory}} in the {{RestServerEndpoint}}/{{RestClient}} we should use the {{ExecutorThreadFactory}} because it uses the {{FatalExitExceptionHandler}} per default as the uncaught exception handler. This should guard against uncaught exceptions by simply terminating the JVM. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576424#comment-16576424 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209284512 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -188,24 +192,32 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { /** * Marks the current {@link BufferBuilder} as finished and clears the state for next one. -* -* @return true if some data were written */ - private boolean tryFinishCurrentBufferBuilder(int targetChannel, RecordSerializer serializer) { - - if (!bufferBuilders[targetChannel].isPresent()) { - return false; + private void tryFinishCurrentBufferBuilder(int targetChannel) { + if (bufferBuilders[targetChannel].isPresent()) { + BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + bufferBuilders[targetChannel] = Optional.empty(); + numBytesOut.inc(bufferBuilder.finish()); } - BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); - bufferBuilders[targetChannel] = Optional.empty(); + } - numBytesOut.inc(bufferBuilder.finish()); - serializer.clear(); - return true; + /** +* The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need +* request a new one for this target channel. +*/ + @Nonnull Review comment: Imo you don't have to add `@Nonnull` annotation. I'm implicitly assuming that any non `@Nullable` marked field is automatically `@Nonnull`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576425#comment-16576425 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209290889 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ## @@ -66,29 +66,33 @@ public boolean isFullBuffer() { } /** -* Starts serializing and copying the given record to the target buffer -* (if available). +* Starts serializing the given record to an intermediate data buffer. * * @param record the record to serialize -* @return how much information was written to the target buffer and -* whether this buffer is full */ - SerializationResult addRecord(T record) throws IOException; + void serializeRecord(T record) throws IOException; Review comment: I'm thinking about refactoring this class and splitting it into two: ``` class RecordSerializer { SerializedRecord serializeRecord(T record); }; class SerializedRecord implements Autoclosable { CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder); void close() { serializer.prune(); // and code to return state (serializationBuffer) to serializer for reuse } } ``` and usage: ``` public void randomEmit(T record) throws IOException, InterruptedException { try (SerializedRecord serializedRecord = serializer.serializeRecord(record)) { copyToTarget(serializedRecord, rng.nextInt(numChannels)); } } ``` somehow always was/is tickling my brain in current `RecordSerializer` is confusing to me and I have to always check it's implementation whenever I revisit the code. Maybe with this split it would be easier to understand? But I'm not sure about this. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576422#comment-16576422 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209290136 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException { } } + /** +* Broadcast channel selector that selects all the output channels. +*/ + private static class Broadcast implements ChannelSelector { + + private int[] returnChannel; + boolean set; Review comment: 1. do we need to cache `returnChannel`? Does it give any meaningful test execution speed up? 2. if so, instead of using `set` and `setNumber`, just check whether `returnChannel.length == numberOfOutputChannels`. If not, create new one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing outputs, that > means the output will be serialized as many times as the number of selected > channels. > As we know, data serialization is a high cost operation, so we can get good > benefits by improving the serialization only once. > I would suggest the following changes for realizing it. > # Only one {{RecordSerializer}} is created in {{RecordWriter}} for all the > channels. > # The output is serialized into the intermediate data buffer only once for > different channels. > # The intermediate serialization results are copied into different > {{BufferBuilder}}s for different channels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9913) Improve output serialization only once in RecordWriter
[ https://issues.apache.org/jira/browse/FLINK-9913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576423#comment-16576423 ] ASF GitHub Bot commented on FLINK-9913: --- pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209289603 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); + final RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( + new String[]{ tempFolder.getRoot().getAbsolutePath() }); + + final ArrayDeque serializedRecords = new ArrayDeque<>(); + final Iterable records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT); + for (SerializationTestType record : records) { + serializedRecords.add(record); + + if (isBroadcastEmit) { + writer.broadcastEmit(record); + } else { + writer.emit(record); + } + } + + final int requiredBuffers = numValues / (bufferSize / (4 + serializationLength)); + for (int i = 0; i < numChannels; i++) { Review comment: can you somehow extract common logic of this method and `org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest#testSerializationRoundTrip(java.lang.Iterable, int, org.apache.flink.runtime.io.network.api.serialization.RecordSerializer, org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer)`? They share a lot of core. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve output serialization only once in RecordWriter > -- > > Key: FLINK-9913 > URL: https://issues.apache.org/jira/browse/FLINK-9913 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.6.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently the {{RecordWriter}} emits output into multi channels via > {{ChannelSelector}} or broadcasts output to all channels directly. Each > channel has a separate {{RecordSerializer}} for serializing
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209290889 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ## @@ -66,29 +66,33 @@ public boolean isFullBuffer() { } /** -* Starts serializing and copying the given record to the target buffer -* (if available). +* Starts serializing the given record to an intermediate data buffer. * * @param record the record to serialize -* @return how much information was written to the target buffer and -* whether this buffer is full */ - SerializationResult addRecord(T record) throws IOException; + void serializeRecord(T record) throws IOException; Review comment: I'm thinking about refactoring this class and splitting it into two: ``` class RecordSerializer { SerializedRecord serializeRecord(T record); }; class SerializedRecord implements Autoclosable { CopingResult copyToBufferBuilder(BufferBuilder bufferBuilder); void close() { serializer.prune(); // and code to return state (serializationBuffer) to serializer for reuse } } ``` and usage: ``` public void randomEmit(T record) throws IOException, InterruptedException { try (SerializedRecord serializedRecord = serializer.serializeRecord(record)) { copyToTarget(serializedRecord, rng.nextInt(numChannels)); } } ``` somehow always was/is tickling my brain in current `RecordSerializer` is confusing to me and I have to always check it's implementation whenever I revisit the code. Maybe with this split it would be easier to understand? But I'm not sure about this. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209290136 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -524,6 +615,31 @@ public void read(DataInputView in) throws IOException { } } + /** +* Broadcast channel selector that selects all the output channels. +*/ + private static class Broadcast implements ChannelSelector { + + private int[] returnChannel; + boolean set; Review comment: 1. do we need to cache `returnChannel`? Does it give any meaningful test execution speed up? 2. if so, instead of using `set` and `setNumber`, just check whether `returnChannel.length == numberOfOutputChannels`. If not, create new one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209284512 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ## @@ -188,24 +192,32 @@ public void setMetricGroup(TaskIOMetricGroup metrics) { /** * Marks the current {@link BufferBuilder} as finished and clears the state for next one. -* -* @return true if some data were written */ - private boolean tryFinishCurrentBufferBuilder(int targetChannel, RecordSerializer serializer) { - - if (!bufferBuilders[targetChannel].isPresent()) { - return false; + private void tryFinishCurrentBufferBuilder(int targetChannel) { + if (bufferBuilders[targetChannel].isPresent()) { + BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); + bufferBuilders[targetChannel] = Optional.empty(); + numBytesOut.inc(bufferBuilder.finish()); } - BufferBuilder bufferBuilder = bufferBuilders[targetChannel].get(); - bufferBuilders[targetChannel] = Optional.empty(); + } - numBytesOut.inc(bufferBuilder.finish()); - serializer.clear(); - return true; + /** +* The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need +* request a new one for this target channel. +*/ + @Nonnull Review comment: Imo you don't have to add `@Nonnull` annotation. I'm implicitly assuming that any non `@Nullable` marked field is automatically `@Nonnull`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter
pnowojski commented on a change in pull request #6417: [FLINK-9913][runtime] Improve output serialization only once in RecordWriter URL: https://github.com/apache/flink/pull/6417#discussion_r209289603 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ## @@ -377,6 +388,86 @@ public void testBroadcastEmitBufferIndependence() throws Exception { assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** +* Tests that records are broadcast via {@link ChannelSelector} and +* {@link RecordWriter#emit(IOReadableWritable)}. +*/ + @Test + public void testEmitRecordWithBroadcastPartitioner() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(false); + } + + /** +* Tests that records are broadcast via {@link RecordWriter#broadcastEmit(IOReadableWritable)}. +*/ + @Test + public void testBroadcastEmitRecord() throws Exception { + emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(true); + } + + /** +* The results of emitting records via BroadcastPartitioner or broadcasting records directly are the same, +* that is all the target channels can receive the whole outputs. +* +* @param isBroadcastEmit whether using {@link RecordWriter#broadcastEmit(IOReadableWritable)} or not +*/ + private void emitRecordWithBroadcastPartitionerOrBroadcastEmitRecord(boolean isBroadcastEmit) throws Exception { + final int numChannels = 4; + final int bufferSize = 32; + final int numValues = 8; + final int serializationLength = 4; + + @SuppressWarnings("unchecked") + final Queue[] queues = new Queue[numChannels]; + for (int i = 0; i < numChannels; i++) { + queues[i] = new ArrayDeque<>(); + } + + final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); + final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); + final RecordWriter writer = new RecordWriter<>(partitionWriter, new Broadcast<>()); + final RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( + new String[]{ tempFolder.getRoot().getAbsolutePath() }); + + final ArrayDeque serializedRecords = new ArrayDeque<>(); + final Iterable records = Util.randomRecords(numValues, SerializationTestTypeFactory.INT); + for (SerializationTestType record : records) { + serializedRecords.add(record); + + if (isBroadcastEmit) { + writer.broadcastEmit(record); + } else { + writer.emit(record); + } + } + + final int requiredBuffers = numValues / (bufferSize / (4 + serializationLength)); + for (int i = 0; i < numChannels; i++) { Review comment: can you somehow extract common logic of this method and `org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest#testSerializationRoundTrip(java.lang.Iterable, int, org.apache.flink.runtime.io.network.api.serialization.RecordSerializer, org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer)`? They share a lot of core. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes
[ https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576399#comment-16576399 ] ASF GitHub Bot commented on FLINK-10041: azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#discussion_r209288519 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.iterator; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +/** + * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups. + * The resulting iteration sequence is ordered by (key-group, kv-state). + */ +public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable { + + private final PriorityQueue heap; + private final int keyGroupPrefixByteCount; + private boolean newKeyGroup; + private boolean newKVState; + private boolean valid; + private RocksSingleStateIterator currentSubIterator; + + private static final List> COMPARATORS; + + static { + int maxBytes = 2; + COMPARATORS = new ArrayList<>(maxBytes); + for (int i = 0; i < maxBytes; ++i) { + final int currentBytes = i + 1; + COMPARATORS.add((o1, o2) -> { + int arrayCmpRes = compareKeyGroupsForByteArrays( + o1.getCurrentKey(), o2.getCurrentKey(), currentBytes); + return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; + }); + } + } + + public RocksStatesPerKeyGroupMergeIterator( + List> kvStateIterators, + final int keyGroupPrefixByteCount) { + Preconditions.checkNotNull(kvStateIterators); + Preconditions.checkArgument(keyGroupPrefixByteCount >= 1); + + this.keyGroupPrefixByteCount = keyGroupPrefixByteCount; + + Comparator iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1); + + if (kvStateIterators.size() > 0) { + PriorityQueue iteratorPriorityQueue = + new PriorityQueue<>(kvStateIterators.size(), iteratorComparator); + + for (Tuple2 rocksIteratorWithKVStateId : kvStateIterators) { + final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0; + rocksIterator.seekToFirst(); + if (rocksIterator.isValid()) { + iteratorPriorityQueue.offer(new RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1)); + } else { + IOUtils.closeQuietly(rocksIterator); + } + } + + kvStateIterators.clear(); + + this.heap = iteratorPriorityQueue; + this.valid = !heap.isEmpty(); + this.currentSubIterator = heap.poll(); + } else { + // creating a PriorityQueue of size 0 results in an exception. + this.heap = null; + this.valid = false; + } + + this.newKeyGroup = true; + this.newKVState = true; + } + + /** +* Advance the iterator. Should only be called if {@link #isValid()}
[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes
[ https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576402#comment-16576402 ] ASF GitHub Bot commented on FLINK-10041: azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#discussion_r209283620 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.iterator; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils; +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnull; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; + +/** + * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class + * is not thread safe. + * + * @param the type of the iterated objects, which are keys in RocksDB. + */ +public class RocksStateKeysIterator implements Iterator, AutoCloseable { + + @Nonnull + private final RocksIteratorWrapper iterator; + + @Nonnull + private final String state; + + @Nonnull + private final TypeSerializer keySerializer; + + @Nonnull + private final byte[] namespaceBytes; + + private final boolean ambiguousKeyPossible; + private final int keyGroupPrefixBytes; + private K nextKey; + private K previousKey; + + public RocksStateKeysIterator( + @Nonnull RocksIteratorWrapper iterator, + @Nonnull String state, + @Nonnull TypeSerializer keySerializer, + int keyGroupPrefixBytes, + boolean ambiguousKeyPossible, + @Nonnull byte[] namespaceBytes) { + this.iterator = iterator; + this.state = state; + this.keySerializer = keySerializer; + this.keyGroupPrefixBytes = keyGroupPrefixBytes; + this.namespaceBytes = namespaceBytes; + this.nextKey = null; + this.previousKey = null; + this.ambiguousKeyPossible = ambiguousKeyPossible; + } + + @Override + public boolean hasNext() { + try { + while (nextKey == null && iterator.isValid()) { + + byte[] key = iterator.key(); + + ByteArrayInputStreamWithPos inputStream = + new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes); + + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream); + + K value = RocksDBKeySerializationUtils.readKey( Review comment: value -> currentKey This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract all different iterators (inner or static inner classes) into full > classes > - > > Key: FLINK-10041 > URL: https://issues.apache.org/jira/browse/FLINK-10041 > Project: Flink > Issue Type: Sub-task >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available >
[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes
[ https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576400#comment-16576400 ] ASF GitHub Bot commented on FLINK-10041: azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#discussion_r209280247 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.iterator; + +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.util.IOUtils; + +import javax.annotation.Nonnull; + +/** + * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator. + * Used by {@link RocksStatesPerKeyGroupMergeIterator}. + */ +class RocksSingleStateIterator implements AutoCloseable { + + /** +* @param iterator The #RocksIterator to wrap . Review comment: Maybe: `@param iterator underlying {@link RocksIteratorWrapper}` otherwise it is like wrapper to wrap wrapper :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract all different iterators (inner or static inner classes) into full > classes > - > > Key: FLINK-10041 > URL: https://issues.apache.org/jira/browse/FLINK-10041 > Project: Flink > Issue Type: Sub-task >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes
[ https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576401#comment-16576401 ] ASF GitHub Bot commented on FLINK-10041: azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#discussion_r209290925 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.iterator; + +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.runtime.state.StateSnapshotTransformer; + +import org.rocksdb.RocksIterator; + +import javax.annotation.Nonnull; + +/** + * Wrapper around {@link RocksIteratorWrapper} that applies a given {@link StateSnapshotTransformer} to the elements Review comment: Wrapper around {@link *RocksIterator*} This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract all different iterators (inner or static inner classes) into full > classes > - > > Key: FLINK-10041 > URL: https://issues.apache.org/jira/browse/FLINK-10041 > Project: Flink > Issue Type: Sub-task >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes
[ https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576403#comment-16576403 ] ASF GitHub Bot commented on FLINK-10041: azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#discussion_r209287458 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.iterator; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +/** + * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups. + * The resulting iteration sequence is ordered by (key-group, kv-state). + */ +public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable { + + private final PriorityQueue heap; + private final int keyGroupPrefixByteCount; + private boolean newKeyGroup; + private boolean newKVState; + private boolean valid; + private RocksSingleStateIterator currentSubIterator; + + private static final List> COMPARATORS; + + static { + int maxBytes = 2; + COMPARATORS = new ArrayList<>(maxBytes); + for (int i = 0; i < maxBytes; ++i) { + final int currentBytes = i + 1; + COMPARATORS.add((o1, o2) -> { + int arrayCmpRes = compareKeyGroupsForByteArrays( + o1.getCurrentKey(), o2.getCurrentKey(), currentBytes); + return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; + }); + } + } + + public RocksStatesPerKeyGroupMergeIterator( + List> kvStateIterators, + final int keyGroupPrefixByteCount) { + Preconditions.checkNotNull(kvStateIterators); + Preconditions.checkArgument(keyGroupPrefixByteCount >= 1); + + this.keyGroupPrefixByteCount = keyGroupPrefixByteCount; + + Comparator iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1); + + if (kvStateIterators.size() > 0) { + PriorityQueue iteratorPriorityQueue = Review comment: could be also separate method like: `PriorityQueue fillQueue(... kvStateIterators)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract all different iterators (inner or static inner classes) into full > classes > - > > Key: FLINK-10041 > URL: https://issues.apache.org/jira/browse/FLINK-10041 > Project: Flink > Issue Type: Sub-task >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes
[ https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576398#comment-16576398 ] ASF GitHub Bot commented on FLINK-10041: azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#discussion_r209283561 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.iterator; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils; +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnull; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; + +/** + * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class + * is not thread safe. + * + * @param the type of the iterated objects, which are keys in RocksDB. + */ +public class RocksStateKeysIterator implements Iterator, AutoCloseable { + + @Nonnull + private final RocksIteratorWrapper iterator; + + @Nonnull + private final String state; + + @Nonnull + private final TypeSerializer keySerializer; + + @Nonnull + private final byte[] namespaceBytes; + + private final boolean ambiguousKeyPossible; + private final int keyGroupPrefixBytes; + private K nextKey; + private K previousKey; + + public RocksStateKeysIterator( + @Nonnull RocksIteratorWrapper iterator, + @Nonnull String state, + @Nonnull TypeSerializer keySerializer, + int keyGroupPrefixBytes, + boolean ambiguousKeyPossible, + @Nonnull byte[] namespaceBytes) { + this.iterator = iterator; + this.state = state; + this.keySerializer = keySerializer; + this.keyGroupPrefixBytes = keyGroupPrefixBytes; + this.namespaceBytes = namespaceBytes; + this.nextKey = null; + this.previousKey = null; + this.ambiguousKeyPossible = ambiguousKeyPossible; + } + + @Override + public boolean hasNext() { + try { + while (nextKey == null && iterator.isValid()) { + + byte[] key = iterator.key(); + + ByteArrayInputStreamWithPos inputStream = Review comment: I would suggest to move deserialisation stuff into smaller method, like: `Tuple2 deserKeyAndNamespacePos(byte[] keyBytes)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract all different iterators (inner or static inner classes) into full > classes > - > > Key: FLINK-10041 > URL: https://issues.apache.org/jira/browse/FLINK-10041 > Project: Flink > Issue Type: Sub-task >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10122) KafkaConsumer should use partitionable state over union state if partition discovery is not active
[ https://issues.apache.org/jira/browse/FLINK-10122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576396#comment-16576396 ] ASF GitHub Bot commented on FLINK-10122: StefanRRichter commented on a change in pull request #6537: [FLINK-10122] KafkaConsumer should use partitionable state over union state if partition discovery is not active URL: https://github.com/apache/flink/pull/6537#discussion_r209291672 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ## @@ -196,6 +196,20 @@ public void removeOperatorState(String name) { } } + public void deleteBroadCastState(String name) { Review comment: Argh...will remove the second one ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaConsumer should use partitionable state over union state if partition > discovery is not active > -- > > Key: FLINK-10122 > URL: https://issues.apache.org/jira/browse/FLINK-10122 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > KafkaConsumer store its offsets state always as union state. I think this is > only required in the case that partition discovery is active. For jobs with a > very high parallelism, the union state can lead to prohibitively expensive > deployments. For example, a job with 2000 source and a total of 10MB > checkpointed union state offsets state would have to ship ~ 2000 x 10MB = > 20GB of state. With partitionable state, it would have to ship ~10MB. > For now, I would suggest to go back to partitionable state in case that > partition discovery is not active. In the long run, I have some ideas for > more efficient partitioning schemes that would also work for active discovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …
azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#discussion_r209290925 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.iterator; + +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.runtime.state.StateSnapshotTransformer; + +import org.rocksdb.RocksIterator; + +import javax.annotation.Nonnull; + +/** + * Wrapper around {@link RocksIteratorWrapper} that applies a given {@link StateSnapshotTransformer} to the elements Review comment: Wrapper around {@link *RocksIterator*} This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …
azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#discussion_r209288519 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.iterator; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +/** + * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups. + * The resulting iteration sequence is ordered by (key-group, kv-state). + */ +public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable { + + private final PriorityQueue heap; + private final int keyGroupPrefixByteCount; + private boolean newKeyGroup; + private boolean newKVState; + private boolean valid; + private RocksSingleStateIterator currentSubIterator; + + private static final List> COMPARATORS; + + static { + int maxBytes = 2; + COMPARATORS = new ArrayList<>(maxBytes); + for (int i = 0; i < maxBytes; ++i) { + final int currentBytes = i + 1; + COMPARATORS.add((o1, o2) -> { + int arrayCmpRes = compareKeyGroupsForByteArrays( + o1.getCurrentKey(), o2.getCurrentKey(), currentBytes); + return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; + }); + } + } + + public RocksStatesPerKeyGroupMergeIterator( + List> kvStateIterators, + final int keyGroupPrefixByteCount) { + Preconditions.checkNotNull(kvStateIterators); + Preconditions.checkArgument(keyGroupPrefixByteCount >= 1); + + this.keyGroupPrefixByteCount = keyGroupPrefixByteCount; + + Comparator iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1); + + if (kvStateIterators.size() > 0) { + PriorityQueue iteratorPriorityQueue = + new PriorityQueue<>(kvStateIterators.size(), iteratorComparator); + + for (Tuple2 rocksIteratorWithKVStateId : kvStateIterators) { + final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0; + rocksIterator.seekToFirst(); + if (rocksIterator.isValid()) { + iteratorPriorityQueue.offer(new RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1)); + } else { + IOUtils.closeQuietly(rocksIterator); + } + } + + kvStateIterators.clear(); + + this.heap = iteratorPriorityQueue; + this.valid = !heap.isEmpty(); + this.currentSubIterator = heap.poll(); + } else { + // creating a PriorityQueue of size 0 results in an exception. + this.heap = null; + this.valid = false; + } + + this.newKeyGroup = true; + this.newKVState = true; + } + + /** +* Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after +* calls to {@link #next()}. Review comment: Typos: Valid flag can only *change* after calling {@link #next()} This
[GitHub] azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …
azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#discussion_r209283561 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.iterator; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils; +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnull; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; + +/** + * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class + * is not thread safe. + * + * @param the type of the iterated objects, which are keys in RocksDB. + */ +public class RocksStateKeysIterator implements Iterator, AutoCloseable { + + @Nonnull + private final RocksIteratorWrapper iterator; + + @Nonnull + private final String state; + + @Nonnull + private final TypeSerializer keySerializer; + + @Nonnull + private final byte[] namespaceBytes; + + private final boolean ambiguousKeyPossible; + private final int keyGroupPrefixBytes; + private K nextKey; + private K previousKey; + + public RocksStateKeysIterator( + @Nonnull RocksIteratorWrapper iterator, + @Nonnull String state, + @Nonnull TypeSerializer keySerializer, + int keyGroupPrefixBytes, + boolean ambiguousKeyPossible, + @Nonnull byte[] namespaceBytes) { + this.iterator = iterator; + this.state = state; + this.keySerializer = keySerializer; + this.keyGroupPrefixBytes = keyGroupPrefixBytes; + this.namespaceBytes = namespaceBytes; + this.nextKey = null; + this.previousKey = null; + this.ambiguousKeyPossible = ambiguousKeyPossible; + } + + @Override + public boolean hasNext() { + try { + while (nextKey == null && iterator.isValid()) { + + byte[] key = iterator.key(); + + ByteArrayInputStreamWithPos inputStream = Review comment: I would suggest to move deserialisation stuff into smaller method, like: `Tuple2 deserKeyAndNamespacePos(byte[] keyBytes)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …
azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#discussion_r209280247 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.iterator; + +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.util.IOUtils; + +import javax.annotation.Nonnull; + +/** + * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator. + * Used by {@link RocksStatesPerKeyGroupMergeIterator}. + */ +class RocksSingleStateIterator implements AutoCloseable { + + /** +* @param iterator The #RocksIterator to wrap . Review comment: Maybe: `@param iterator underlying {@link RocksIteratorWrapper}` otherwise it is like wrapper to wrap wrapper :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …
azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#discussion_r209283620 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.iterator; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils; +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnull; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; + +/** + * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class + * is not thread safe. + * + * @param the type of the iterated objects, which are keys in RocksDB. + */ +public class RocksStateKeysIterator implements Iterator, AutoCloseable { + + @Nonnull + private final RocksIteratorWrapper iterator; + + @Nonnull + private final String state; + + @Nonnull + private final TypeSerializer keySerializer; + + @Nonnull + private final byte[] namespaceBytes; + + private final boolean ambiguousKeyPossible; + private final int keyGroupPrefixBytes; + private K nextKey; + private K previousKey; + + public RocksStateKeysIterator( + @Nonnull RocksIteratorWrapper iterator, + @Nonnull String state, + @Nonnull TypeSerializer keySerializer, + int keyGroupPrefixBytes, + boolean ambiguousKeyPossible, + @Nonnull byte[] namespaceBytes) { + this.iterator = iterator; + this.state = state; + this.keySerializer = keySerializer; + this.keyGroupPrefixBytes = keyGroupPrefixBytes; + this.namespaceBytes = namespaceBytes; + this.nextKey = null; + this.previousKey = null; + this.ambiguousKeyPossible = ambiguousKeyPossible; + } + + @Override + public boolean hasNext() { + try { + while (nextKey == null && iterator.isValid()) { + + byte[] key = iterator.key(); + + ByteArrayInputStreamWithPos inputStream = + new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes); + + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream); + + K value = RocksDBKeySerializationUtils.readKey( Review comment: value -> currentKey This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner …
azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract iterators from RocksDBKeyedStateBackend (inner … URL: https://github.com/apache/flink/pull/6501#discussion_r209287458 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.iterator; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +/** + * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups. + * The resulting iteration sequence is ordered by (key-group, kv-state). + */ +public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable { + + private final PriorityQueue heap; + private final int keyGroupPrefixByteCount; + private boolean newKeyGroup; + private boolean newKVState; + private boolean valid; + private RocksSingleStateIterator currentSubIterator; + + private static final List> COMPARATORS; + + static { + int maxBytes = 2; + COMPARATORS = new ArrayList<>(maxBytes); + for (int i = 0; i < maxBytes; ++i) { + final int currentBytes = i + 1; + COMPARATORS.add((o1, o2) -> { + int arrayCmpRes = compareKeyGroupsForByteArrays( + o1.getCurrentKey(), o2.getCurrentKey(), currentBytes); + return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; + }); + } + } + + public RocksStatesPerKeyGroupMergeIterator( + List> kvStateIterators, + final int keyGroupPrefixByteCount) { + Preconditions.checkNotNull(kvStateIterators); + Preconditions.checkArgument(keyGroupPrefixByteCount >= 1); + + this.keyGroupPrefixByteCount = keyGroupPrefixByteCount; + + Comparator iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1); + + if (kvStateIterators.size() > 0) { + PriorityQueue iteratorPriorityQueue = Review comment: could be also separate method like: `PriorityQueue fillQueue(... kvStateIterators)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6537: [FLINK-10122] KafkaConsumer should use partitionable state over union state if partition discovery is not active
StefanRRichter commented on a change in pull request #6537: [FLINK-10122] KafkaConsumer should use partitionable state over union state if partition discovery is not active URL: https://github.com/apache/flink/pull/6537#discussion_r209291672 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ## @@ -196,6 +196,20 @@ public void removeOperatorState(String name) { } } + public void deleteBroadCastState(String name) { Review comment: Argh...will remove the second one ;) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10122) KafkaConsumer should use partitionable state over union state if partition discovery is not active
[ https://issues.apache.org/jira/browse/FLINK-10122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576394#comment-16576394 ] ASF GitHub Bot commented on FLINK-10122: aljoscha commented on a change in pull request #6537: [FLINK-10122] KafkaConsumer should use partitionable state over union state if partition discovery is not active URL: https://github.com/apache/flink/pull/6537#discussion_r209291065 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ## @@ -196,6 +196,20 @@ public void removeOperatorState(String name) { } } + public void deleteBroadCastState(String name) { Review comment: I think you added these methods twice, once under `deleteBroadcastState` and once under `deleteBroadCastState`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaConsumer should use partitionable state over union state if partition > discovery is not active > -- > > Key: FLINK-10122 > URL: https://issues.apache.org/jira/browse/FLINK-10122 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > KafkaConsumer store its offsets state always as union state. I think this is > only required in the case that partition discovery is active. For jobs with a > very high parallelism, the union state can lead to prohibitively expensive > deployments. For example, a job with 2000 source and a total of 10MB > checkpointed union state offsets state would have to ship ~ 2000 x 10MB = > 20GB of state. With partitionable state, it would have to ship ~10MB. > For now, I would suggest to go back to partitionable state in case that > partition discovery is not active. In the long run, I have some ideas for > more efficient partitioning schemes that would also work for active discovery. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] aljoscha commented on a change in pull request #6537: [FLINK-10122] KafkaConsumer should use partitionable state over union state if partition discovery is not active
aljoscha commented on a change in pull request #6537: [FLINK-10122] KafkaConsumer should use partitionable state over union state if partition discovery is not active URL: https://github.com/apache/flink/pull/6537#discussion_r209291065 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ## @@ -196,6 +196,20 @@ public void removeOperatorState(String name) { } } + public void deleteBroadCastState(String name) { Review comment: I think you added these methods twice, once under `deleteBroadcastState` and once under `deleteBroadCastState`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aljoscha commented on a change in pull request #6538: [hotfix][streaming] Fix and simplify PrintSinkFunctionTest
aljoscha commented on a change in pull request #6538: [hotfix][streaming] Fix and simplify PrintSinkFunctionTest URL: https://github.com/apache/flink/pull/6538#discussion_r209283937 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ## @@ -20,106 +20,84 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; import org.junit.After; -import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import static org.junit.Assert.assertEquals; /** - * Tests for the {@link org.apache.flink.streaming.api.functions.sink.PrintSinkFunction}. + * Tests for the {@link PrintSinkFunction}. */ public class PrintSinkFunctionTest { - public PrintStream printStreamOriginal = System.out; - private String line = System.lineSeparator(); + private final PrintStream originalSystemOut = System.out; + private final PrintStream originalSystemErr = System.err; - @Test - public void testPrintSinkStdOut() throws Exception { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream stream = new PrintStream(baos); - System.setOut(stream); + private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + private final ByteArrayOutputStream arrayErrorStream = new ByteArrayOutputStream(); - final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class); + private final String line = System.lineSeparator(); - PrintSinkFunction printSink = new PrintSinkFunction<>(); - printSink.setRuntimeContext(ctx); - try { - printSink.open(new Configuration()); - } catch (Exception e) { - Assert.fail(); + @Before + public void setUp() { + System.setOut(new PrintStream(arrayOutputStream)); + System.setErr(new PrintStream(arrayErrorStream)); + } + + @After + public void tearDown() { + if (System.out != originalSystemOut) { + System.out.close(); } + if (System.err != originalSystemErr) { + System.err.close(); + } + System.setOut(originalSystemOut); + System.setErr(originalSystemErr); + } + + @Test + public void testPrintSinkStdOut() throws Exception { + PrintSinkFunction printSink = new PrintSinkFunction<>(); + printSink.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 0)); printSink.setTargetToStandardOut(); + printSink.open(new Configuration()); + printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); assertEquals("Print to System.out", printSink.toString()); - assertEquals("hello world!" + line, baos.toString()); - - printSink.close(); Review comment: we could still close the sink function This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10006) Improve logging in BarrierBuffer
[ https://issues.apache.org/jira/browse/FLINK-10006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576366#comment-16576366 ] ASF GitHub Bot commented on FLINK-10006: dawidwys commented on issue #6470: [FLINK-10006][network] improve logging in BarrierBuffer: prepend owning task name URL: https://github.com/apache/flink/pull/6470#issuecomment-412103541 +1, lgtm This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve logging in BarrierBuffer > > > Key: FLINK-10006 > URL: https://issues.apache.org/jira/browse/FLINK-10006 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.2, 1.6.0, 1.7.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Labels: pull-request-available > > Almost all log messages of {{BarrierBuffer}} do not contain the task name and > are therefore of little use if either multiple slots are executed on a single > TM or multiple checkpoints run in parallel. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dawidwys commented on issue #6470: [FLINK-10006][network] improve logging in BarrierBuffer: prepend owning task name
dawidwys commented on issue #6470: [FLINK-10006][network] improve logging in BarrierBuffer: prepend owning task name URL: https://github.com/apache/flink/pull/6470#issuecomment-412103541 +1, lgtm This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10109) Add documentation for StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576352#comment-16576352 ] ASF GitHub Bot commented on FLINK-10109: aljoscha commented on issue #6532: [FLINK-10109] Add documentation for StreamingFileSink URL: https://github.com/apache/flink/pull/6532#issuecomment-412101222 I think I addressed all comments, PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add documentation for StreamingFileSink > --- > > Key: FLINK-10109 > URL: https://issues.apache.org/jira/browse/FLINK-10109 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] aljoscha commented on issue #6532: [FLINK-10109] Add documentation for StreamingFileSink
aljoscha commented on issue #6532: [FLINK-10109] Add documentation for StreamingFileSink URL: https://github.com/apache/flink/pull/6532#issuecomment-412101222 I think I addressed all comments, PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #6527: [hotfix] [docs] Fix ProcessWindowFunction code snippets.
asfgit closed pull request #6527: [hotfix] [docs] Fix ProcessWindowFunction code snippets. URL: https://github.com/apache/flink/pull/6527 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md index e5ec0091e07..bebc5dd260e 100644 --- a/docs/dev/stream/operators/windows.md +++ b/docs/dev/stream/operators/windows.md @@ -724,17 +724,19 @@ A `ProcessWindowFunction` can be defined and used like this: DataStream> input = ...; input -.keyBy() -.window() -.process(new MyProcessWindowFunction()); + .keyBy(t -> t.f0) + .timeWindow(Time.minutes(5)) + .process(new MyProcessWindowFunction()); /* ... */ -public class MyProcessWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> { +public class MyProcessWindowFunction +extends ProcessWindowFunction, String, String, TimeWindow> { - void process(String key, Context context, Iterable> input, Collector out) { + @Override + public void process(String key, Context context, Iterable> input, Collector out) { long count = 0; -for (Tuple in: input) { +for (Tuple2 in: input) { count++; } out.collect("Window: " + context.window() + "count: " + count); @@ -749,9 +751,9 @@ public class MyProcessWindowFunction extends ProcessWindowFunction) -.window() -.process(new MyProcessWindowFunction()) + .keyBy(_._1) + .timeWindow(Time.minutes(5)) + .process(new MyProcessWindowFunction()) /* ... */ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10047) Checkpointing takes much longer with credit base flow control.
[ https://issues.apache.org/jira/browse/FLINK-10047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576320#comment-16576320 ] Piotr Nowojski commented on FLINK-10047: [~zjwang] at the moment it seems that this is not a problem of credit base flow control. Still investigating this. I will keep you posted :) > Checkpointing takes much longer with credit base flow control. > -- > > Key: FLINK-10047 > URL: https://issues.apache.org/jira/browse/FLINK-10047 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2 >Reporter: Piotr Nowojski >Priority: Critical > > As reported by an user, in some scenario it looks like checkpointing takes > significantly more time (~40 minutes compared to ~2 minutes in Flink 1.4). > Probably throughput is also lower. > We are waiting for more logs with detailed Flink's metrics to get more > insight. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on issue #6527: [hotfix] [docs] Fix ProcessWindowFunction code snippets.
fhueske commented on issue #6527: [hotfix] [docs] Fix ProcessWindowFunction code snippets. URL: https://github.com/apache/flink/pull/6527#issuecomment-412090788 Thanks @dawidwys Merging This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576310#comment-16576310 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209260792 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ## @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception { stream.close(); } + @Test + public void testPrintSinkWithIdentifierAndPrefix() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream stream = new PrintStream(baos); + System.setOut(stream); + + final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class); + Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2); + Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1); + + PrintSinkFunction printSink = new PrintSinkFunction<>(false, "mySink"); + printSink.setRuntimeContext(ctx); + try { + printSink.open(new Configuration()); + } catch (Exception e) { + Assert.fail(); + } + printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, baos.toString()); + + printSink.setTargetToStandardErr(); + assertEquals("Print to System.err", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, baos.toString()); + + printSink.close(); + stream.close(); + } + + @Test + public void testPrintSinkWithIdentifierButNoPrefix() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); Review comment: There is quite a lot of code duplication in those tests and they were unnecessarily using mockito instead of proper mock. Also there was even a bug in `testPrintSinkStdErr`. I have fixed those issues in a hotifx: https://github.com/apache/flink/pull/6538 I would the hotifx to be merged before this PR and please adapt/rewrite your test in similar fashion as I did in my hotfix. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576314#comment-16576314 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209234301 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -55,6 +57,17 @@ public PrintSinkFunction(boolean stdErr) { target = stdErr; } + /** +* Instantiates a print sink function that prints to standard out and gives a sink identifier. +* +* @param stdErr True, if the format should print to standard error instead of standard out. +* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value +*/ + public PrintSinkFunction(boolean stdErr, String sinkIdentifier) { + this(stdErr); + this.sinkIdentifier = sinkIdentifier; Review comment: Usually less detailed constructor is calling the more specific ones, not the other way around. Here it will allow you to mark fields as `final` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576311#comment-16576311 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209234901 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws Exception { // get the target stream stream = target == STD_OUT ? System.out : System.err; + /** Review comment: please move this to class's java doc. Also replace `sinkId`, `sink id` with `{@code sinkIdentifier} `. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576308#comment-16576308 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209237840 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws Exception { // get the target stream stream = target == STD_OUT ? System.out : System.err; + /** +* Four possible format options: +* sinkId:taskId> output <- sink id provided, parallelism > 1 +* sinkId> output <- sink id provided, parallelism == 1 +* taskId> output <- no sink id provided, parallelism > 1 +* output <- no sink id provided, parallelism == 1 +*/ + // set the prefix if we have a >1 parallelism prefix = (context.getNumberOfParallelSubtasks() > 1) ? ((context.getIndexOfThisSubtask() + 1) + "> ") : null; + + if (prefix == null) { Review comment: Don't use nulls here. In this case we can easily use empty string for the same purpose and it will be safer (no possible NPE). Btw, rephrasing this logic like that: ``` completedPrefix = sinkIdentifier; if (context.getNumberOfParallelSubtasks() > 1)) { if (!completedPrefix.isEmpty()) { completedPrefix += ":"; } completedPrefix += (context.getIndexOfThisSubtask() + 1); } if (!completedPrefix.isEmpty()) { completedPrefix += ">"; } ``` (optionally with ternary operator instead of some 'if' statements - that's only a matter of taste) simplifies the logic and deduplicate some of the code/constants. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576309#comment-16576309 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209233947 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -40,6 +40,8 @@ private boolean target; private transient PrintStream stream; private transient String prefix; + private String sinkIdentifier; + private transient String completedPrefix; Review comment: Please drop `prefix` field, it's only a local variable now. Also please change `target` and `sinkIdentifier` into `final` fields. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576313#comment-16576313 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209239877 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ## @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception { stream.close(); } + @Test + public void testPrintSinkWithIdentifierAndPrefix() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream stream = new PrintStream(baos); + System.setOut(stream); + + final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class); + Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2); + Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1); + + PrintSinkFunction printSink = new PrintSinkFunction<>(false, "mySink"); + printSink.setRuntimeContext(ctx); + try { + printSink.open(new Configuration()); + } catch (Exception e) { + Assert.fail(); Review comment: Do not hide the original exception. From what I've heard there was some bug with old junit version and that's why this pattern is reoccurring in Flink tests. Regardless if that was the case, it's not the problem anymore, and hiding original exception makes test failures harder to read/understand. (ditto in rest of the file as well). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576312#comment-16576312 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209238315 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -89,6 +116,8 @@ public void invoke(IN record) { public void close() { this.stream = null; this.prefix = null; + this.sinkIdentifier = null; + this.completedPrefix = null; Review comment: Please drop the this `close` method. It doesn't do anything beside causing `NPE`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9850) Add a string to the print method to identify output for DataStream
[ https://issues.apache.org/jira/browse/FLINK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576307#comment-16576307 ] ASF GitHub Bot commented on FLINK-9850: --- pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209239178 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ## @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception { stream.close(); } + @Test + public void testPrintSinkWithIdentifierAndPrefix() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream stream = new PrintStream(baos); + System.setOut(stream); + + final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class); Review comment: Do not use mockito for such things, it's very difficult to maintain such tests in the future. Instead please move `org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseTest.MockRuntimeContext` class to `flink-streaming-java` module to some utility package and reuse it in this whole `PrintSinkFunctionTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a string to the print method to identify output for DataStream > -- > > Key: FLINK-9850 > URL: https://issues.apache.org/jira/browse/FLINK-9850 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Hequn Cheng >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The output of the print method of {[DataSet}} allows the user to supply a > String to identify the output(see > [FLINK-1486|https://issues.apache.org/jira/browse/FLINK-1486]). But > {[DataStream}} doesn't support now. It is valuable to add this feature for > {{DataStream}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209233947 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -40,6 +40,8 @@ private boolean target; private transient PrintStream stream; private transient String prefix; + private String sinkIdentifier; + private transient String completedPrefix; Review comment: Please drop `prefix` field, it's only a local variable now. Also please change `target` and `sinkIdentifier` into `final` fields. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209234301 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -55,6 +57,17 @@ public PrintSinkFunction(boolean stdErr) { target = stdErr; } + /** +* Instantiates a print sink function that prints to standard out and gives a sink identifier. +* +* @param stdErr True, if the format should print to standard error instead of standard out. +* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value +*/ + public PrintSinkFunction(boolean stdErr, String sinkIdentifier) { + this(stdErr); + this.sinkIdentifier = sinkIdentifier; Review comment: Usually less detailed constructor is calling the more specific ones, not the other way around. Here it will allow you to mark fields as `final` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209239877 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ## @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception { stream.close(); } + @Test + public void testPrintSinkWithIdentifierAndPrefix() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream stream = new PrintStream(baos); + System.setOut(stream); + + final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class); + Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2); + Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1); + + PrintSinkFunction printSink = new PrintSinkFunction<>(false, "mySink"); + printSink.setRuntimeContext(ctx); + try { + printSink.open(new Configuration()); + } catch (Exception e) { + Assert.fail(); Review comment: Do not hide the original exception. From what I've heard there was some bug with old junit version and that's why this pattern is reoccurring in Flink tests. Regardless if that was the case, it's not the problem anymore, and hiding original exception makes test failures harder to read/understand. (ditto in rest of the file as well). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209260792 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ## @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception { stream.close(); } + @Test + public void testPrintSinkWithIdentifierAndPrefix() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream stream = new PrintStream(baos); + System.setOut(stream); + + final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class); + Mockito.when(ctx.getNumberOfParallelSubtasks()).thenReturn(2); + Mockito.when(ctx.getIndexOfThisSubtask()).thenReturn(1); + + PrintSinkFunction printSink = new PrintSinkFunction<>(false, "mySink"); + printSink.setRuntimeContext(ctx); + try { + printSink.open(new Configuration()); + } catch (Exception e) { + Assert.fail(); + } + printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); + + assertEquals("Print to System.out", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, baos.toString()); + + printSink.setTargetToStandardErr(); + assertEquals("Print to System.err", printSink.toString()); + assertEquals("mySink:2> hello world!" + line, baos.toString()); + + printSink.close(); + stream.close(); + } + + @Test + public void testPrintSinkWithIdentifierButNoPrefix() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); Review comment: There is quite a lot of code duplication in those tests and they were unnecessarily using mockito instead of proper mock. Also there was even a bug in `testPrintSinkStdErr`. I have fixed those issues in a hotifx: https://github.com/apache/flink/pull/6538 I would the hotifx to be merged before this PR and please adapt/rewrite your test in similar fashion as I did in my hotfix. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209237840 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws Exception { // get the target stream stream = target == STD_OUT ? System.out : System.err; + /** +* Four possible format options: +* sinkId:taskId> output <- sink id provided, parallelism > 1 +* sinkId> output <- sink id provided, parallelism == 1 +* taskId> output <- no sink id provided, parallelism > 1 +* output <- no sink id provided, parallelism == 1 +*/ + // set the prefix if we have a >1 parallelism prefix = (context.getNumberOfParallelSubtasks() > 1) ? ((context.getIndexOfThisSubtask() + 1) + "> ") : null; + + if (prefix == null) { Review comment: Don't use nulls here. In this case we can easily use empty string for the same purpose and it will be safer (no possible NPE). Btw, rephrasing this logic like that: ``` completedPrefix = sinkIdentifier; if (context.getNumberOfParallelSubtasks() > 1)) { if (!completedPrefix.isEmpty()) { completedPrefix += ":"; } completedPrefix += (context.getIndexOfThisSubtask() + 1); } if (!completedPrefix.isEmpty()) { completedPrefix += ">"; } ``` (optionally with ternary operator instead of some 'if' statements - that's only a matter of taste) simplifies the logic and deduplicate some of the code/constants. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209234901 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -70,15 +83,29 @@ public void open(Configuration parameters) throws Exception { // get the target stream stream = target == STD_OUT ? System.out : System.err; + /** Review comment: please move this to class's java doc. Also replace `sinkId`, `sink id` with `{@code sinkIdentifier} `. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209238315 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/PrintSinkFunction.java ## @@ -89,6 +116,8 @@ public void invoke(IN record) { public void close() { this.stream = null; this.prefix = null; + this.sinkIdentifier = null; + this.completedPrefix = null; Review comment: Please drop the this `close` method. It doesn't do anything beside causing `NPE`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream
pnowojski commented on a change in pull request #6367: [FLINK-9850] Add a string to the print method to identify output for DataStream URL: https://github.com/apache/flink/pull/6367#discussion_r209239178 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ## @@ -117,6 +117,65 @@ public void testPrintSinkWithPrefix() throws Exception { stream.close(); } + @Test + public void testPrintSinkWithIdentifierAndPrefix() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream stream = new PrintStream(baos); + System.setOut(stream); + + final StreamingRuntimeContext ctx = Mockito.mock(StreamingRuntimeContext.class); Review comment: Do not use mockito for such things, it's very difficult to maintain such tests in the future. Instead please move `org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBaseTest.MockRuntimeContext` class to `flink-streaming-java` module to some utility package and reuse it in this whole `PrintSinkFunctionTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576269#comment-16576269 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209251987 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -204,227 +133,78 @@ public NodeId put( * @throws Exception Thrown if the system cannot access the state. */ public boolean isEmpty() throws Exception { - return Iterables.isEmpty(eventsBuffer.keys()); + return Iterables.isEmpty(eventsBuffer.keys()) && Iterables.isEmpty(eventsBufferCache.keySet()); } /** -* Returns all elements from the previous relation starting at the given entry. -* -* @param nodeId id of the starting entry -* @param version Version of the previous relation which shall be extracted -* @return Collection of previous relations starting with the given value -* @throws Exception Thrown if the system cannot access the state. +* Put an event to cache. +* @param eventId id of the event +* @param event event body */ - public List>> extractPatterns( - final NodeId nodeId, - final DeweyNumber version) throws Exception { - - List>> result = new ArrayList<>(); - - // stack to remember the current extraction states - Stack extractionStates = new Stack<>(); - - // get the starting shared buffer entry for the previous relation - Lockable entryLock = entries.get(nodeId); - - if (entryLock != null) { - SharedBufferNode entry = entryLock.getElement(); - extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>())); - - // use a depth first search to reconstruct the previous relations - while (!extractionStates.isEmpty()) { - final ExtractionState extractionState = extractionStates.pop(); - // current path of the depth first search - final Stack> currentPath = extractionState.getPath(); - final Tuple2 currentEntry = extractionState.getEntry(); - - // termination criterion - if (currentEntry == null) { - final Map> completePath = new LinkedHashMap<>(); - - while (!currentPath.isEmpty()) { - final NodeId currentPathEntry = currentPath.pop().f0; - - String page = currentPathEntry.getPageName(); - List values = completePath - .computeIfAbsent(page, k -> new ArrayList<>()); - values.add(currentPathEntry.getEventId()); - } - result.add(completePath); - } else { - - // append state to the path - currentPath.push(currentEntry); - - boolean firstMatch = true; - for (SharedBufferEdge edge : currentEntry.f1.getEdges()) { - // we can only proceed if the current version is compatible to the version - // of this previous relation - final DeweyNumber currentVersion = extractionState.getVersion(); - if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) { - final NodeId target = edge.getTarget(); - Stack> newPath; - - if (firstMatch) { - // for the first match we don't have to copy the current path - newPath = currentPath; - firstMatch = false; - } else { -
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576268#comment-16576268 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209249252 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -204,227 +133,78 @@ public NodeId put( * @throws Exception Thrown if the system cannot access the state. */ public boolean isEmpty() throws Exception { - return Iterables.isEmpty(eventsBuffer.keys()); + return Iterables.isEmpty(eventsBuffer.keys()) && Iterables.isEmpty(eventsBufferCache.keySet()); Review comment: Check the cache first. In case there is sth in the cache we won't need to access the state. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576254#comment-16576254 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209234513 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ + private SharedBuffer sharedBuffer; + + public SharedBufferAccessor(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + public void setSharedBuffer(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + /** +* Notifies shared buffer that there will be no events with timestamp the given value. It allows to clear +* internal counters for number of events seen so far per timestamp. +* +* @param timestamp watermark, no earlier events will arrive +* @throws Exception Thrown if the system cannot access the state. +*/ + public void advanceTime(long timestamp) throws Exception { + sharedBuffer.advanceTime(timestamp); + } + + /** +* Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a +* lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed +* after processing all {@link org.apache.flink.cep.nfa.ComputationState}s +* +* NOTE:Should be called only once for each unique event! +* +* @param value event to be registered +* @return unique id of that event that should be used when putting entries to the buffer. +* @throws Exception Thrown if the system cannot access the state. +*/ + public EventId registerEvent(V value, long timestamp) throws Exception { + return sharedBuffer.registerEvent(value, timestamp); + } + + /** +* Stores given value (value + timestamp) under the given state. It assigns a preceding element +* relation to the previous entry. +* +* @param stateName name of the state that the
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576262#comment-16576262 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209248594 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -204,227 +133,78 @@ public NodeId put( * @throws Exception Thrown if the system cannot access the state. */ public boolean isEmpty() throws Exception { - return Iterables.isEmpty(eventsBuffer.keys()); + return Iterables.isEmpty(eventsBuffer.keys()) && Iterables.isEmpty(eventsBufferCache.keySet()); } /** -* Returns all elements from the previous relation starting at the given entry. -* -* @param nodeId id of the starting entry -* @param version Version of the previous relation which shall be extracted -* @return Collection of previous relations starting with the given value -* @throws Exception Thrown if the system cannot access the state. +* Put an event to cache. +* @param eventId id of the event +* @param event event body */ - public List>> extractPatterns( - final NodeId nodeId, - final DeweyNumber version) throws Exception { - - List>> result = new ArrayList<>(); - - // stack to remember the current extraction states - Stack extractionStates = new Stack<>(); - - // get the starting shared buffer entry for the previous relation - Lockable entryLock = entries.get(nodeId); - - if (entryLock != null) { - SharedBufferNode entry = entryLock.getElement(); - extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>())); - - // use a depth first search to reconstruct the previous relations - while (!extractionStates.isEmpty()) { - final ExtractionState extractionState = extractionStates.pop(); - // current path of the depth first search - final Stack> currentPath = extractionState.getPath(); - final Tuple2 currentEntry = extractionState.getEntry(); - - // termination criterion - if (currentEntry == null) { - final Map> completePath = new LinkedHashMap<>(); - - while (!currentPath.isEmpty()) { - final NodeId currentPathEntry = currentPath.pop().f0; - - String page = currentPathEntry.getPageName(); - List values = completePath - .computeIfAbsent(page, k -> new ArrayList<>()); - values.add(currentPathEntry.getEventId()); - } - result.add(completePath); - } else { - - // append state to the path - currentPath.push(currentEntry); - - boolean firstMatch = true; - for (SharedBufferEdge edge : currentEntry.f1.getEdges()) { - // we can only proceed if the current version is compatible to the version - // of this previous relation - final DeweyNumber currentVersion = extractionState.getVersion(); - if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) { - final NodeId target = edge.getTarget(); - Stack> newPath; - - if (firstMatch) { - // for the first match we don't have to copy the current path - newPath = currentPath; - firstMatch = false; - } else { -
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576266#comment-16576266 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209234729 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ + private SharedBuffer sharedBuffer; + + public SharedBufferAccessor(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + public void setSharedBuffer(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + /** +* Notifies shared buffer that there will be no events with timestamp the given value. It allows to clear +* internal counters for number of events seen so far per timestamp. +* +* @param timestamp watermark, no earlier events will arrive +* @throws Exception Thrown if the system cannot access the state. +*/ + public void advanceTime(long timestamp) throws Exception { + sharedBuffer.advanceTime(timestamp); + } + + /** +* Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a +* lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed +* after processing all {@link org.apache.flink.cep.nfa.ComputationState}s +* +* NOTE:Should be called only once for each unique event! +* +* @param value event to be registered +* @return unique id of that event that should be used when putting entries to the buffer. +* @throws Exception Thrown if the system cannot access the state. +*/ + public EventId registerEvent(V value, long timestamp) throws Exception { + return sharedBuffer.registerEvent(value, timestamp); + } + + /** +* Stores given value (value + timestamp) under the given state. It assigns a preceding element +* relation to the previous entry. +* +* @param stateName name of the state that the
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576252#comment-16576252 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209233907 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ + private SharedBuffer sharedBuffer; + + public SharedBufferAccessor(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + public void setSharedBuffer(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + /** +* Notifies shared buffer that there will be no events with timestamp the given value. It allows to clear +* internal counters for number of events seen so far per timestamp. +* +* @param timestamp watermark, no earlier events will arrive +* @throws Exception Thrown if the system cannot access the state. +*/ + public void advanceTime(long timestamp) throws Exception { + sharedBuffer.advanceTime(timestamp); + } + + /** +* Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a +* lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed +* after processing all {@link org.apache.flink.cep.nfa.ComputationState}s +* +* NOTE:Should be called only once for each unique event! +* +* @param value event to be registered +* @return unique id of that event that should be used when putting entries to the buffer. +* @throws Exception Thrown if the system cannot access the state. +*/ + public EventId registerEvent(V value, long timestamp) throws Exception { + return sharedBuffer.registerEvent(value, timestamp); + } + + /** +* Stores given value (value + timestamp) under the given state. It assigns a preceding element +* relation to the previous entry. +* +* @param stateName name of the state that the