[jira] [Updated] (FLINK-5861) TaskManager's components support updating JobManagerConnection
[ https://issues.apache.org/jira/browse/FLINK-5861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Biao Liu updated FLINK-5861: Description: Some components in TaskManager, such as TaskManagerActions, CheckpointResponder, ResultPartitionConsumableNotifier, PartitionProducerStateChecker, need to support updating JobManagerConnection. So when JobManager fails and recovers, the tasks who keep old JobManagerConnection can be notified to update JobManagerConnection. The tasks can continue doing their jobs without failure. > TaskManager's components support updating JobManagerConnection > -- > > Key: FLINK-5861 > URL: https://issues.apache.org/jira/browse/FLINK-5861 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager >Reporter: Biao Liu >Assignee: Biao Liu > > Some components in TaskManager, such as TaskManagerActions, > CheckpointResponder, ResultPartitionConsumableNotifier, > PartitionProducerStateChecker, need to support updating JobManagerConnection. > So when JobManager fails and recovers, the tasks who keep old > JobManagerConnection can be notified to update JobManagerConnection. The > tasks can continue doing their jobs without failure. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5861) TaskManager's components support updating JobManagerConnection
[ https://issues.apache.org/jira/browse/FLINK-5861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Biao Liu updated FLINK-5861: Summary: TaskManager's components support updating JobManagerConnection (was: TaskManager's components support updating JobManagerGateway) > TaskManager's components support updating JobManagerConnection > -- > > Key: FLINK-5861 > URL: https://issues.apache.org/jira/browse/FLINK-5861 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, JobManager >Reporter: Biao Liu >Assignee: Biao Liu > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5861) TaskManager's components support updating JobManagerGateway
Biao Liu created FLINK-5861: --- Summary: TaskManager's components support updating JobManagerGateway Key: FLINK-5861 URL: https://issues.apache.org/jira/browse/FLINK-5861 Project: Flink Issue Type: Sub-task Reporter: Biao Liu Assignee: Biao Liu -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875533#comment-15875533 ] Yuhong Hong commented on FLINK-5658: Hi [~fhueske] & [~wheat9], i saw haohui have already commit the rowtime() feature, but still no cover the LogicalWindowRelNode situation, as how to distinguish rowtime() and proctime(), i want to do like this: LogicalProject(with RexOver expr) -> [normalize rule(ProjectToWindow)] ->CalciteLogicalWindow(input = LogicalProject(with rowtime() func)) -> (normalize rule(FlinkLogicalWindowRule)) ->FlinkLogicalWindow(input = LogicalProject, isEventtime=true/false, window=CalciteLogicalWindow) Cause after normalize period, the rowtime() function will be replaced by generator code according to ReduceExpressionsRule, so we can only check whether eventtime or proctime before apply ReduceExpressionsRule in normalize period. So i want to add an FlinkLogicalWindow which include the CalciteLogicalWindow and an isEventtime attribute, and add a additional rule(FlinkLogicalWindowRule) to do the transform. In FlinkLogicalWindowRule, i will check according whether the function operator is EventTimeExtractor or ProcTimeExtractor. What do you think? or if already have solution, please let me know. > Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL > > > Key: FLINK-5658 > URL: https://issues.apache.org/jira/browse/FLINK-5658 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Yuhong Hong > > The goal of this issue is to add support for OVER RANGE aggregations on event > time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5655) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5692) Add an Option to Deactivate Kryo Fallback for Serializers
[ https://issues.apache.org/jira/browse/FLINK-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875518#comment-15875518 ] Jin Mingjian commented on FLINK-5692: - [~StephanEwen], I use "forceCustomSerializerCheck" to follow the existed option naming style. Let me know if you think some other is better. > Add an Option to Deactivate Kryo Fallback for Serializers > - > > Key: FLINK-5692 > URL: https://issues.apache.org/jira/browse/FLINK-5692 > Project: Flink > Issue Type: New Feature > Components: Type Serialization System >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Jin Mingjian > Labels: easyfix, starter > > Some users want to avoid that Flink's serializers use Kryo, as it can easily > become a hotspot in serialization. > For those users, it would help if there is a flag to "deactive generic > types". Those users could then see where types are used that default to Kryo > and change these types (make them PoJos, Value types, or write custom > serializers). > There are two ways to approach that: > 1. (Simple) Make {{GenericTypeInfo}} threw an exception whenever it would > create a Kryo Serializer (when the respective flag is set in the > {{ExecutionConfig}}) > 2. Have a static flag on the {{TypeExtractor}} to throw an exception > whenever it would create a {{GenericTypeInfo}}. This approach has the > downside of introducing some static configuration to the TypeExtractor, but > may be more helpful because it throws exceptions in the programs at points > where the types are used (not where the serializers are created, which may be > much later). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3373: [FLINK-5692] [config] Add an Option to Deactivate ...
GitHub user jinmingjian opened a pull request: https://github.com/apache/flink/pull/3373 [FLINK-5692] [config] Add an Option to Deactivate Kryo Fallback for Serializers Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinmingjian/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3373.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3373 commit 1ff46e53efa2094ac6881b1ca014bf7752277ff2 Author: Jin MingjianDate: 2017-02-21T03:57:21Z [FLINK-5692] [config] Add an Option to Deactivate Kryo Fallback for Serializers --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5692) Add an Option to Deactivate Kryo Fallback for Serializers
[ https://issues.apache.org/jira/browse/FLINK-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875513#comment-15875513 ] ASF GitHub Bot commented on FLINK-5692: --- GitHub user jinmingjian opened a pull request: https://github.com/apache/flink/pull/3373 [FLINK-5692] [config] Add an Option to Deactivate Kryo Fallback for Serializers Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinmingjian/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3373.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3373 commit 1ff46e53efa2094ac6881b1ca014bf7752277ff2 Author: Jin MingjianDate: 2017-02-21T03:57:21Z [FLINK-5692] [config] Add an Option to Deactivate Kryo Fallback for Serializers > Add an Option to Deactivate Kryo Fallback for Serializers > - > > Key: FLINK-5692 > URL: https://issues.apache.org/jira/browse/FLINK-5692 > Project: Flink > Issue Type: New Feature > Components: Type Serialization System >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Jin Mingjian > Labels: easyfix, starter > > Some users want to avoid that Flink's serializers use Kryo, as it can easily > become a hotspot in serialization. > For those users, it would help if there is a flag to "deactive generic > types". Those users could then see where types are used that default to Kryo > and change these types (make them PoJos, Value types, or write custom > serializers). > There are two ways to approach that: > 1. (Simple) Make {{GenericTypeInfo}} threw an exception whenever it would > create a Kryo Serializer (when the respective flag is set in the > {{ExecutionConfig}}) > 2. Have a static flag on the {{TypeExtractor}} to throw an exception > whenever it would create a {{GenericTypeInfo}}. This approach has the > downside of introducing some static configuration to the TypeExtractor, but > may be more helpful because it throws exceptions in the programs at points > where the types are used (not where the serializers are created, which may be > much later). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5524) Support early out for code generated conjunctive conditions
[ https://issues.apache.org/jira/browse/FLINK-5524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875459#comment-15875459 ] ASF GitHub Bot commented on FLINK-5524: --- GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3372 [FLINK-5524] [table] Support early out for code generated AND/OR condition For condition like a AND b, if the result of a is false, we can save b from execution. For condition like a OR b, if the result of a is true, we can also save b from execution. You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-5524 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3372.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3372 commit 4f8c07ffc6ac99da67803114edb732e20df793e6 Author: Kurt YoungDate: 2017-02-21T06:35:17Z [FLINK-5524] [table] Support early out for code generated AND/OR conditions > Support early out for code generated conjunctive conditions > --- > > Key: FLINK-5524 > URL: https://issues.apache.org/jira/browse/FLINK-5524 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0, 1.1.4, 1.3.0 >Reporter: Fabian Hueske >Assignee: Kurt Young > > Currently, all nested conditions for a conjunctive predicate are evaluated > before the conjunction is checked. > A condition like {{(v1 == v2) && (v3 < 5)}} would be compiled into > {code} > boolean res1; > if (v1 == v2) { > res1 = true; > } else { > res1 = false; > } > boolean res2; > if (v3 < 5) { > res2 = true; > } else { > res2 = false; > } > boolean res3; > if (res1 && res2) { > res3 = true; > } else { > res3 = false; > } > if (res3) { > // emit something > } > {code} > It would be better to leave the generated code as early as possible, e.g., > with a {{return}} instead of {{res1 = false}}. The code generator needs a bit > of context information for that. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3372: [FLINK-5524] [table] Support early out for code ge...
GitHub user KurtYoung opened a pull request: https://github.com/apache/flink/pull/3372 [FLINK-5524] [table] Support early out for code generated AND/OR condition For condition like a AND b, if the result of a is false, we can save b from execution. For condition like a OR b, if the result of a is true, we can also save b from execution. You can merge this pull request into a Git repository by running: $ git pull https://github.com/KurtYoung/flink flink-5524 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3372.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3372 commit 4f8c07ffc6ac99da67803114edb732e20df793e6 Author: Kurt YoungDate: 2017-02-21T06:35:17Z [FLINK-5524] [table] Support early out for code generated AND/OR conditions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5830) OutOfMemoryError during notify final state in TaskExecutor may cause job stuck
[ https://issues.apache.org/jira/browse/FLINK-5830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875444#comment-15875444 ] ASF GitHub Bot commented on FLINK-5830: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3360 @StephanEwen , already submit the modifications. > OutOfMemoryError during notify final state in TaskExecutor may cause job stuck > -- > > Key: FLINK-5830 > URL: https://issues.apache.org/jira/browse/FLINK-5830 > Project: Flink > Issue Type: Bug >Reporter: zhijiang >Assignee: zhijiang > > The scenario is like this: > {{JobMaster}} tries to cancel all the executions when process failed > execution, and the task executor already acknowledge the cancel rpc message. > When notify the final state in {{TaskExecutor}}, it causes OOM in > {{AkkaRpcActor}} and this error is caught to log the info. The final state > will not be sent any more. > The {{JobMaster}} can not receive the final state and trigger the restart > strategy. > One solution is to catch the {{OutOfMemoryError}} and throw it, then it will > cause to shut down the {{ActorSystem}} resulting in exiting the > {{TaskExecutor}}. The {{JobMaster}} can be notified of {{TaskExecutor}} > failure and fail all the tasks to trigger restart successfully. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3360 @StephanEwen , already submit the modifications. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-5524) Support early out for code generated conjunctive conditions
[ https://issues.apache.org/jira/browse/FLINK-5524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young reassigned FLINK-5524: - Assignee: Kurt Young > Support early out for code generated conjunctive conditions > --- > > Key: FLINK-5524 > URL: https://issues.apache.org/jira/browse/FLINK-5524 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.2.0, 1.1.4, 1.3.0 >Reporter: Fabian Hueske >Assignee: Kurt Young > > Currently, all nested conditions for a conjunctive predicate are evaluated > before the conjunction is checked. > A condition like {{(v1 == v2) && (v3 < 5)}} would be compiled into > {code} > boolean res1; > if (v1 == v2) { > res1 = true; > } else { > res1 = false; > } > boolean res2; > if (v3 < 5) { > res2 = true; > } else { > res2 = false; > } > boolean res3; > if (res1 && res2) { > res3 = true; > } else { > res3 = false; > } > if (res3) { > // emit something > } > {code} > It would be better to leave the generated code as early as possible, e.g., > with a {{return}} instead of {{res1 = false}}. The code generator needs a bit > of context information for that. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875426#comment-15875426 ] ASF GitHub Bot commented on FLINK-4856: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102138099 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java --- @@ -484,6 +487,71 @@ public static Throwable deserializeServerFailure(ByteBuf buf) throws IOException return null; } } + + /** +* Serializes all values of the Iterable with the given serializer. +* +* @param entries Key-value pairs to serialize +* @param keySerializer Serializer for UK +* @param valueSerializer Serializer for UV +* @param Type of the keys +* @param Type of the values +* @return Serialized values or null if values null or empty +* @throws IOException On failure during serialization +*/ + public staticbyte[] serializeMap(Iterable > entries, TypeSerializer keySerializer, TypeSerializer valueSerializer) throws IOException { + if (entries != null) { + Iterator > it = entries.iterator(); + + if (it.hasNext()) { + // Serialize + DataOutputSerializer dos = new DataOutputSerializer(32); + + while (it.hasNext()) { + Map.Entry entry = it.next(); + + keySerializer.serialize(entry.getKey(), dos); + valueSerializer.serialize(entry.getValue(), dos); + } + + return dos.getCopyOfBuffer(); + } else { + return null; --- End diff -- The function is unused now. I will delete it in the update. > Add MapState for keyed streams > -- > > Key: FLINK-4856 > URL: https://issues.apache.org/jira/browse/FLINK-4856 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Many states in keyed streams are organized as key-value pairs. Currently, > these states are implemented by storing the entire map into a ValueState or a > ListState. The implementation however is very costly because all entries have > to be serialized/deserialized when updating a single entry. To improve the > efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102138099 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java --- @@ -484,6 +487,71 @@ public static Throwable deserializeServerFailure(ByteBuf buf) throws IOException return null; } } + + /** +* Serializes all values of the Iterable with the given serializer. +* +* @param entries Key-value pairs to serialize +* @param keySerializer Serializer for UK +* @param valueSerializer Serializer for UV +* @param Type of the keys +* @param Type of the values +* @return Serialized values or null if values null or empty +* @throws IOException On failure during serialization +*/ + public staticbyte[] serializeMap(Iterable > entries, TypeSerializer keySerializer, TypeSerializer valueSerializer) throws IOException { + if (entries != null) { + Iterator > it = entries.iterator(); + + if (it.hasNext()) { + // Serialize + DataOutputSerializer dos = new DataOutputSerializer(32); + + while (it.hasNext()) { + Map.Entry entry = it.next(); + + keySerializer.serialize(entry.getKey(), dos); + valueSerializer.serialize(entry.getValue(), dos); + } + + return dos.getCopyOfBuffer(); + } else { + return null; --- End diff -- The function is unused now. I will delete it in the update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5780) Extend ConfigOption with descriptions
[ https://issues.apache.org/jira/browse/FLINK-5780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875418#comment-15875418 ] shijinkui commented on FLINK-5780: -- Just sound like a extension of apache common-cli. https://commons.apache.org/proper/commons-cli/ IMO, commonk-cli style is the standard, i like it. Is that so? > Extend ConfigOption with descriptions > - > > Key: FLINK-5780 > URL: https://issues.apache.org/jira/browse/FLINK-5780 > Project: Flink > Issue Type: Sub-task > Components: Core, Documentation >Reporter: Ufuk Celebi > > The {{ConfigOption}} type is meant to replace the flat {{ConfigConstants}}. > As part of automating the generation of a docs config page we need to extend > {{ConfigOption}} with description fields. > From the ML discussion, these could be: > {code} > void shortDescription(String); > void longDescription(String); > {code} > In practice, the description string should contain HTML/Markdown. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875396#comment-15875396 ] ASF GitHub Bot commented on FLINK-4856: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102135289 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java --- @@ -410,6 +415,124 @@ public void testDeserializeListTooShort2() throws Exception { KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 3}, LongSerializer.INSTANCE); } + + /** +* Tests map serialization utils. +*/ + @Test + public void testMapSerialization() throws Exception { + final long key = 0L; + + // objects for heap state list serialisation + final HeapKeyedStateBackend longHeapKeyedStateBackend = + new HeapKeyedStateBackend<>( + mock(TaskKvStateRegistry.class), + LongSerializer.INSTANCE, + ClassLoader.getSystemClassLoader(), + 1, new KeyGroupRange(0, 0) + ); + longHeapKeyedStateBackend.setCurrentKey(key); + + final InternalMapStatemapState = longHeapKeyedStateBackend.createMapState( + VoidNamespaceSerializer.INSTANCE, + new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE)); + + testMapSerialization(key, mapState); + } + + /** +* Verifies that the serialization of a map using the given map state +* matches the deserialization with {@link KvStateRequestSerializer#deserializeList}. +* +* @param key +* key of the map state +* @param mapState +* map state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance +* +* @throws Exception +*/ + public static void testMapSerialization( + final long key, + final InternalMapState mapState) throws Exception { + + TypeSerializer userKeySerializer = LongSerializer.INSTANCE; + TypeSerializer userValueSerializer = StringSerializer.INSTANCE; + mapState.setCurrentNamespace(VoidNamespace.INSTANCE); + + // List + final int numElements = 10; + + final Map expectedValues = new HashMap<>(); + for (int i = 0; i < numElements; i++) { + final long value = ThreadLocalRandom.current().nextLong(); --- End diff -- I prefer to use `ThreadLocalRandom.current()` which is also used in other tests in this file. Though it makes difficult to reproduce the case, it may help to find corner cases. > Add MapState for keyed streams > -- > > Key: FLINK-4856 > URL: https://issues.apache.org/jira/browse/FLINK-4856 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Many states in keyed streams are organized as key-value pairs. Currently, > these states are implemented by storing the entire map into a ValueState or a > ListState. The implementation however is very costly because all entries have > to be serialized/deserialized when updating a single entry. To improve the > efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102135289 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java --- @@ -410,6 +415,124 @@ public void testDeserializeListTooShort2() throws Exception { KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 3}, LongSerializer.INSTANCE); } + + /** +* Tests map serialization utils. +*/ + @Test + public void testMapSerialization() throws Exception { + final long key = 0L; + + // objects for heap state list serialisation + final HeapKeyedStateBackend longHeapKeyedStateBackend = + new HeapKeyedStateBackend<>( + mock(TaskKvStateRegistry.class), + LongSerializer.INSTANCE, + ClassLoader.getSystemClassLoader(), + 1, new KeyGroupRange(0, 0) + ); + longHeapKeyedStateBackend.setCurrentKey(key); + + final InternalMapStatemapState = longHeapKeyedStateBackend.createMapState( + VoidNamespaceSerializer.INSTANCE, + new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE)); + + testMapSerialization(key, mapState); + } + + /** +* Verifies that the serialization of a map using the given map state +* matches the deserialization with {@link KvStateRequestSerializer#deserializeList}. +* +* @param key +* key of the map state +* @param mapState +* map state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance +* +* @throws Exception +*/ + public static void testMapSerialization( + final long key, + final InternalMapState mapState) throws Exception { + + TypeSerializer userKeySerializer = LongSerializer.INSTANCE; + TypeSerializer userValueSerializer = StringSerializer.INSTANCE; + mapState.setCurrentNamespace(VoidNamespace.INSTANCE); + + // List + final int numElements = 10; + + final Map expectedValues = new HashMap<>(); + for (int i = 0; i < numElements; i++) { + final long value = ThreadLocalRandom.current().nextLong(); --- End diff -- I prefer to use `ThreadLocalRandom.current()` which is also used in other tests in this file. Though it makes difficult to reproduce the case, it may help to find corner cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5803) Add [partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875351#comment-15875351 ] sunjincheng commented on FLINK-5803: Hi [~fhueske] In accordance with the design I made above, I have completed the implementation that does not carry procTime(). Can I open a PR. or wait for FLINK-5710 ? > Add [partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING > aggregation to SQL > --- > > Key: FLINK-5803 > URL: https://issues.apache.org/jira/browse/FLINK-5803 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN UNBOUNDED > PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - bounded PRECEDING is not supported (see FLINK-5654) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875342#comment-15875342 ] ASF GitHub Bot commented on FLINK-4856: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102129362 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -834,7 +836,7 @@ private void restoreKVStateData() throws IOException, RocksDBException { } @Override - protectedInternalValueState createValueState( + public InternalValueState createValueState( --- End diff -- It is mainly due to the unit tests in `KvStateRequestSerializerTest` which need the accessors to `InternalKvState`. A better choice to use `getPartitionState()` to obtain a user-facing state and convert it to an internal state. What do you think? > Add MapState for keyed streams > -- > > Key: FLINK-4856 > URL: https://issues.apache.org/jira/browse/FLINK-4856 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Many states in keyed streams are organized as key-value pairs. Currently, > these states are implemented by storing the entire map into a ValueState or a > ListState. The implementation however is very costly because all entries have > to be serialized/deserialized when updating a single entry. To improve the > efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102129362 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -834,7 +836,7 @@ private void restoreKVStateData() throws IOException, RocksDBException { } @Override - protectedInternalValueState createValueState( + public InternalValueState createValueState( --- End diff -- It is mainly due to the unit tests in `KvStateRequestSerializerTest` which need the accessors to `InternalKvState`. A better choice to use `getPartitionState()` to obtain a user-facing state and convert it to an internal state. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875328#comment-15875328 ] ASF GitHub Bot commented on FLINK-4856: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102128355 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java --- @@ -93,6 +95,18 @@ public DefaultKeyedStateStore(KeyedStateBackend keyedStateBackend, ExecutionC } } + @Override + publicMapState getMapState(MapStateDescriptor stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); + try { + stateProperties.initializeSerializerUnlessSet(executionConfig); + MapState originalState = getPartitionedState(stateProperties); + return new UserFacingMapState<>(originalState); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); --- End diff -- Currently, `KeyedStateStore#getState()` does not throw exception in its declaration. `RuntimeException` is the only exception that can be thrown. Since the modification to the interface will affect user code (users will have to deal with thrown exceptions), I am not sure it's okay to modify the function declaration in `KeyedStateStore`. > Add MapState for keyed streams > -- > > Key: FLINK-4856 > URL: https://issues.apache.org/jira/browse/FLINK-4856 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Many states in keyed streams are organized as key-value pairs. Currently, > these states are implemented by storing the entire map into a ValueState or a > ListState. The implementation however is very costly because all entries have > to be serialized/deserialized when updating a single entry. To improve the > efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102128355 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java --- @@ -93,6 +95,18 @@ public DefaultKeyedStateStore(KeyedStateBackend keyedStateBackend, ExecutionC } } + @Override + publicMapState getMapState(MapStateDescriptor stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); + try { + stateProperties.initializeSerializerUnlessSet(executionConfig); + MapState originalState = getPartitionedState(stateProperties); + return new UserFacingMapState<>(originalState); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); --- End diff -- Currently, `KeyedStateStore#getState()` does not throw exception in its declaration. `RuntimeException` is the only exception that can be thrown. Since the modification to the interface will affect user code (users will have to deal with thrown exceptions), I am not sure it's okay to modify the function declaration in `KeyedStateStore`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875321#comment-15875321 ] ASF GitHub Bot commented on FLINK-4856: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102127867 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState+ extends AbstractRocksDBState , MapStateDescriptor , Map > + implements InternalMapState { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBMapState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. +*/ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + MapStateDescriptor stateDesc, + RocksDBKeyedStateBackend backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // + // MapState Implementation + // + + @Override + public UV get(UK userKey) throws IOException { + try { +
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102127867 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState+ extends AbstractRocksDBState , MapStateDescriptor , Map > + implements InternalMapState { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBMapState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. +*/ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + MapStateDescriptor stateDesc, + RocksDBKeyedStateBackend backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // + // MapState Implementation + // + + @Override + public UV get(UK userKey) throws IOException { + try { + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); + byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes); + + return (rawValueBytes == null ? null :
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102127767 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState+ extends AbstractRocksDBState , MapStateDescriptor , Map > + implements InternalMapState { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBMapState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. +*/ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + MapStateDescriptor stateDesc, + RocksDBKeyedStateBackend backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // + // MapState Implementation + // + + @Override + public UV get(UK userKey) throws IOException { + try { + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); + byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes); + + return (rawValueBytes == null ? null :
[jira] [Commented] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875320#comment-15875320 ] ASF GitHub Bot commented on FLINK-4856: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102127767 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState+ extends AbstractRocksDBState , MapStateDescriptor , Map > + implements InternalMapState { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBMapState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. +*/ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + MapStateDescriptor stateDesc, + RocksDBKeyedStateBackend backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // + // MapState Implementation + // + + @Override + public UV get(UK userKey) throws IOException { + try { +
[jira] [Commented] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875311#comment-15875311 ] ASF GitHub Bot commented on FLINK-4856: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102126863 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState+ extends AbstractRocksDBState , MapStateDescriptor , Map > + implements InternalMapState { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBMapState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. +*/ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + MapStateDescriptor stateDesc, + RocksDBKeyedStateBackend backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // + // MapState Implementation + // + + @Override + public UV get(UK userKey) throws IOException { + try { +
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102126863 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState+ extends AbstractRocksDBState , MapStateDescriptor , Map > + implements InternalMapState { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBMapState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. +*/ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + MapStateDescriptor stateDesc, + RocksDBKeyedStateBackend backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // + // MapState Implementation + // + + @Override + public UV get(UK userKey) throws IOException { + try { + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); + byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes); + + return (rawValueBytes == null ? null :
[jira] [Commented] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875298#comment-15875298 ] ASF GitHub Bot commented on FLINK-4856: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102125445 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState+ extends AbstractRocksDBState , MapStateDescriptor , Map > + implements InternalMapState { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBMapState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. +*/ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + MapStateDescriptor stateDesc, + RocksDBKeyedStateBackend backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // + // MapState Implementation + // + + @Override + public UV get(UK userKey) throws IOException { + try { +
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102125445 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState+ extends AbstractRocksDBState , MapStateDescriptor , Map > + implements InternalMapState { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBMapState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. +*/ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + MapStateDescriptor stateDesc, + RocksDBKeyedStateBackend backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // + // MapState Implementation + // + + @Override + public UV get(UK userKey) throws IOException { + try { + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); + byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes); + + return (rawValueBytes == null ? null :
[GitHub] flink pull request #3336: [FLINK-4856][state] Add MapState in KeyedState
Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102125062 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState+ extends AbstractRocksDBState , MapStateDescriptor , Map > + implements InternalMapState { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; --- End diff -- To be honest, i have no idea why we can't put `writeOptions` in base class. We put it in `AbstractRocksDBState` and do not come across any problem in our production environment. Maybe @aljoscha is more familiar with the problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4856) Add MapState for keyed streams
[ https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875297#comment-15875297 ] ASF GitHub Bot commented on FLINK-4856: --- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3336#discussion_r102125062 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java --- @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * + * {@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of the keys in the map state. + * @param The type of the values in the map state. + */ +public class RocksDBMapState+ extends AbstractRocksDBState , MapStateDescriptor , Map > + implements InternalMapState { + + /** Serializer for the keys and values */ + private final TypeSerializer userKeySerializer; + private final TypeSerializer userValueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + private final WriteOptions writeOptions; --- End diff -- To be honest, i have no idea why we can't put `writeOptions` in base class. We put it in `AbstractRocksDBState` and do not come across any problem in our production environment. Maybe @aljoscha is more familiar with the problem. > Add MapState for keyed streams > -- > > Key: FLINK-4856 > URL: https://issues.apache.org/jira/browse/FLINK-4856 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Many states in keyed streams are organized as key-value pairs. Currently, > these states are implemented by storing the entire map into a ValueState or a > ListState. The implementation however is very costly because all entries have > to be serialized/deserialized when updating a single entry. To improve the > efficiency of these states, MapStates are urgently needed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5414) Bump up Calcite version to 1.11
[ https://issues.apache.org/jira/browse/FLINK-5414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875296#comment-15875296 ] ASF GitHub Bot commented on FLINK-5414: --- Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3338#discussion_r102125031 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala --- @@ -155,15 +155,15 @@ class ExpressionReductionTest extends TableTestBase { "DataSetCalc", batchTableNode(0), term("select", -"13 AS _c0", +"CAST(13) AS _c0", --- End diff -- Just played around a little bit. I think the problem is that the advanced types are not properly canonized. Using the following diff can pass all tests in `ArrayTypeTest`: ``` --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -133,12 +133,18 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp override def createTypeWithNullability( relDataType: RelDataType, nullable: Boolean) - : RelDataType = relDataType match { -case composite: CompositeRelDataType => - // at the moment we do not care about nullability - composite -case _ => - super.createTypeWithNullability(relDataType, nullable) + : RelDataType = { +val t = relDataType match { + case composite: CompositeRelDataType => +// at the moment we do not care about nullability +composite + case array: ArrayRelDataType => +val elementType = canonize(createTypeWithNullability(array.getComponentType, nullable)) +new ArrayRelDataType(array.typeInfo, elementType, nullable) + case _ => +super.createTypeWithNullability(relDataType, nullable) +} +canonize(t) } } ``` GroupWindowTest is still failing as it misses an identity projection. I'm wondering why `ProjectRemoveRule.INSTANCE` did not kick in... > Bump up Calcite version to 1.11 > --- > > Key: FLINK-5414 > URL: https://issues.apache.org/jira/browse/FLINK-5414 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > The upcoming Calcite release 1.11 has a lot of stability fixes and new > features. We should update it for the Table API. > E.g. we can hopefully merge FLINK-4864 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder
[ https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-5860: - Description: Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will get a Unit test list. Replace all the file creating from `java.io.tmpdir` with TemporaryFolder. Who can fix this problem thoroughly? ``` $ grep -ri 'System.getProperty("java.io.tmpdir")' . ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java: env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend")); ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) }); ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: public static final String DEFAULT_TASK_MANAGER_TMP_PATH = System.getProperty("java.io.tmpdir"); ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java: final String tempPath = System.getProperty("java.io.tmpdir"); ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: final File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java: final String outDir = params.get("output", System.getProperty("java.io.tmpdir")); ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java: final String tmpDir = System.getProperty("java.io.tmpdir"); ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java: final String outPath = System.getProperty("java.io.tmpdir"); ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; ./flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java: baseDir = new File(System.getProperty("java.io.tmpdir")); ./flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java: return System.getProperty("java.io.tmpdir"); ./flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java: System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
[GitHub] flink pull request #3338: [FLINK-5414] [table] Bump up Calcite version to 1....
Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3338#discussion_r102125031 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala --- @@ -155,15 +155,15 @@ class ExpressionReductionTest extends TableTestBase { "DataSetCalc", batchTableNode(0), term("select", -"13 AS _c0", +"CAST(13) AS _c0", --- End diff -- Just played around a little bit. I think the problem is that the advanced types are not properly canonized. Using the following diff can pass all tests in `ArrayTypeTest`: ``` --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -133,12 +133,18 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp override def createTypeWithNullability( relDataType: RelDataType, nullable: Boolean) - : RelDataType = relDataType match { -case composite: CompositeRelDataType => - // at the moment we do not care about nullability - composite -case _ => - super.createTypeWithNullability(relDataType, nullable) + : RelDataType = { +val t = relDataType match { + case composite: CompositeRelDataType => +// at the moment we do not care about nullability +composite + case array: ArrayRelDataType => +val elementType = canonize(createTypeWithNullability(array.getComponentType, nullable)) +new ArrayRelDataType(array.typeInfo, elementType, nullable) + case _ => +super.createTypeWithNullability(relDataType, nullable) +} +canonize(t) } } ``` GroupWindowTest is still failing as it misses an identity projection. I'm wondering why `ProjectRemoveRule.INSTANCE` did not kick in... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder
shijinkui created FLINK-5860: Summary: Replace all the file creating from java.io.tmpdir with TemporaryFolder Key: FLINK-5860 URL: https://issues.apache.org/jira/browse/FLINK-5860 Project: Flink Issue Type: Test Components: Tests Reporter: shijinkui Search `System.getProperty("java.io.tmpdir")` whole Flink project. It will get a Unit test list. Replace all the file creating from `java.io.tmpdir` with TemporaryFolder. Who can fix this problem thoroughly? ``` $ grep -ri 'System.getProperty("java.io.tmpdir")' . ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java: env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend")); ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) }); ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: public static final String DEFAULT_TASK_MANAGER_TMP_PATH = System.getProperty("java.io.tmpdir"); ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java: final String tempPath = System.getProperty("java.io.tmpdir"); ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: final File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java: final String outDir = params.get("output", System.getProperty("java.io.tmpdir")); ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java: final String tmpDir = System.getProperty("java.io.tmpdir"); ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java: final String outPath = System.getProperty("java.io.tmpdir"); ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; ./flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java: baseDir = new File(System.getProperty("java.io.tmpdir")); ./flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java: return System.getProperty("java.io.tmpdir"); ./flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java:
[jira] [Commented] (FLINK-5830) OutOfMemoryError during notify final state in TaskExecutor may cause job stuck
[ https://issues.apache.org/jira/browse/FLINK-5830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875284#comment-15875284 ] ASF GitHub Bot commented on FLINK-5830: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3360 @StephanEwen , thank you for so quick reviews! That is a good idea to add the uniform way in the utils, so we can use that in anywhere. I will fix it as your suggestions later today. > OutOfMemoryError during notify final state in TaskExecutor may cause job stuck > -- > > Key: FLINK-5830 > URL: https://issues.apache.org/jira/browse/FLINK-5830 > Project: Flink > Issue Type: Bug >Reporter: zhijiang >Assignee: zhijiang > > The scenario is like this: > {{JobMaster}} tries to cancel all the executions when process failed > execution, and the task executor already acknowledge the cancel rpc message. > When notify the final state in {{TaskExecutor}}, it causes OOM in > {{AkkaRpcActor}} and this error is caught to log the info. The final state > will not be sent any more. > The {{JobMaster}} can not receive the final state and trigger the restart > strategy. > One solution is to catch the {{OutOfMemoryError}} and throw it, then it will > cause to shut down the {{ActorSystem}} resulting in exiting the > {{TaskExecutor}}. The {{JobMaster}} can be notified of {{TaskExecutor}} > failure and fail all the tasks to trigger restart successfully. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3360: [FLINK-5830][Distributed Coordination] Handle OutOfMemory...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3360 @StephanEwen , thank you for so quick reviews! That is a good idea to add the uniform way in the utils, so we can use that in anywhere. I will fix it as your suggestions later today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5836) Race condition between slot offering and task deployment
[ https://issues.apache.org/jira/browse/FLINK-5836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875280#comment-15875280 ] ASF GitHub Bot commented on FLINK-5836: --- GitHub user wenlong88 opened a pull request: https://github.com/apache/flink/pull/3371 [FLINK-5836] Fix race condition between offer slot and submit task The solution is the same as what till described in jira: activating the slots when reserving them on `TaskExecutor` before offering to `JobManager` You can merge this pull request into a Git repository by running: $ git pull https://github.com/wenlong88/flink jira-5836 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3371.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3371 commit 35042f29e055a7f83b7c4d79e4c72673711dfd78 Author: wenlong.lwlDate: 2017-01-06T08:32:08Z Fix race condition between offer slot and submit task > Race condition between slot offering and task deployment > > > Key: FLINK-5836 > URL: https://issues.apache.org/jira/browse/FLINK-5836 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Wenlong Lyu > Labels: flip-6 > > The Flip-6 code has a race condition when offering slots to a {{JobManager}} > which directly deploys tasks to the offered slots. In such a situation it is > possible that the deploy call overtakes the acknowledge message for the slot > offering. As a result, the slots are not marked yet as active and the > deployment will fail. > I propose to fix this problem by first activating all offered slots before > sending the slot offer message to the {{JobManager}}. Consequently, we'll > deactivate and free slots which haven't been accepted by the {{JobManager}} > once we've received the offering acknowledge message. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3371: [FLINK-5836] Fix race condition between offer slot...
GitHub user wenlong88 opened a pull request: https://github.com/apache/flink/pull/3371 [FLINK-5836] Fix race condition between offer slot and submit task The solution is the same as what till described in jira: activating the slots when reserving them on `TaskExecutor` before offering to `JobManager` You can merge this pull request into a Git repository by running: $ git pull https://github.com/wenlong88/flink jira-5836 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3371.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3371 commit 35042f29e055a7f83b7c4d79e4c72673711dfd78 Author: wenlong.lwlDate: 2017-01-06T08:32:08Z Fix race condition between offer slot and submit task --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5859) support partition pruning on Table API & SQL
godfrey he created FLINK-5859: - Summary: support partition pruning on Table API & SQL Key: FLINK-5859 URL: https://issues.apache.org/jira/browse/FLINK-5859 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: godfrey he Assignee: godfrey he Many data sources are partitionable storage, e.g. HDFS, Druid. And many queries just need to read a small subset of the total data. We can use partition information to prune or skip over files irrelevant to the user’s queries. Both query optimization time and execution time can be reduced obviously, especially for a large partitioned table. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin
[ https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875277#comment-15875277 ] ASF GitHub Bot commented on FLINK-5546: --- Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3190 hi, @StephanEwen I have re-submit this pull request base on current master branch which had merged FLINK-5817. > java.io.tmpdir setted as project build directory in surefire plugin > --- > > Key: FLINK-5546 > URL: https://issues.apache.org/jira/browse/FLINK-5546 > Project: Flink > Issue Type: Sub-task > Components: Build System > Environment: CentOS 7.2 >Reporter: Syinchwun Leo >Assignee: shijinkui > Fix For: 1.2.1 > > > When multiple Linux users run test at the same time, flink-runtime module may > fail. User A creates /tmp/cacheFile, and User B will have no permission to > visit the fold. > Failed tests: > FileCacheDeleteValidationTest.setup:79 Error initializing the test: > /tmp/cacheFile (Permission denied) > Tests in error: > IOManagerTest.channelEnumerator:54 » Runtime Could not create storage > director... > Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3190 hi, @StephanEwen I have re-submit this pull request base on current master branch which had merged FLINK-5817. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5795) Improve “UDTF" to support constructor with parameter.
[ https://issues.apache.org/jira/browse/FLINK-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875273#comment-15875273 ] ASF GitHub Bot commented on FLINK-5795: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3330 Looks good to me. Wait for another committer +1 > Improve “UDTF" to support constructor with parameter. > - > > Key: FLINK-5795 > URL: https://issues.apache.org/jira/browse/FLINK-5795 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3330: [FLINK-5795][TableAPI] Improve UDTF to support constr...
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3330 Looks good to me. Wait for another committer +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5858) Support multiple sinks in same execution DAG
[ https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he updated FLINK-5858: -- Description: When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example: {code:title=Example.scala|borderStyle=solid} val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val csvTableSource = new CsvTableSource( "/tmp/words", Array("first", "id", "score", "last"), Array( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO ), fieldDelim = "#" ) tEnv.registerTableSource("csv_source", csvTableSource) val resultTable = tEnv.scan("csv_source") .groupBy('first) .select('first, 'score.sum) resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1")) resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2")) println(tEnv.explain(resultTable)) {code} Results: == Abstract Syntax Tree == LogicalProject(first=[$0], TMP_1=[$1]) LogicalAggregate(group=[{0}], TMP_0=[SUM($1)]) LogicalProject(first=[$0], score=[$2]) LogicalTableScan(table=[[csv_source]]) == Optimized Logical Plan == DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0]) BatchTableSourceScan(table=[[csv_source]], fields=[first, score]) == Physical Execution Plan == {color:red} Stage 6 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 5 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 4 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 3 : GroupReduce content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 2 : Map content : to: Row(f0: String, f1: Double) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 1 : Map content : Map at emitDataSet(CsvTableSink.scala:67) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 0 : Data Sink content : TextOutputFormat (/tmp/wordcount1) - UTF-8 ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED {color:red} Stage 13 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 12 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 11 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 10 :
[jira] [Updated] (FLINK-5858) Support multiple sinks in same execution DAG
[ https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he updated FLINK-5858: -- Description: When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example: {code:title=Example.scala|borderStyle=solid} val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val csvTableSource = new CsvTableSource( "/tmp/words", Array("first", "id", "score", "last"), Array( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO ), fieldDelim = "#" ) tEnv.registerTableSource("csv_source", csvTableSource) val resultTable = tEnv.scan("csv_source") .groupBy('first) .select('first, 'score.sum) resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1")) resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2")) println(tEnv.explain(resultTable)) {code} result: == Abstract Syntax Tree == LogicalProject(first=[$0], TMP_1=[$1]) LogicalAggregate(group=[{0}], TMP_0=[SUM($1)]) LogicalProject(first=[$0], score=[$2]) LogicalTableScan(table=[[csv_source]]) == Optimized Logical Plan == DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0]) BatchTableSourceScan(table=[[csv_source]], fields=[first, score]) == Physical Execution Plan == {color:red} Stage 6 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 5 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 4 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 3 : GroupReduce content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 2 : Map content : to: Row(f0: String, f1: Double) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 1 : Map content : Map at emitDataSet(CsvTableSink.scala:67) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 0 : Data Sink content : TextOutputFormat (/tmp/wordcount1) - UTF-8 ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED {color:red} Stage 13 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 12 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 11 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 10 :
[jira] [Created] (FLINK-5858) Support multiple sinks in same execution DAG
godfrey he created FLINK-5858: - Summary: Support multiple sinks in same execution DAG Key: FLINK-5858 URL: https://issues.apache.org/jira/browse/FLINK-5858 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: godfrey he When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example: {code:title=Example.scala|borderStyle=solid} val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val csvTableSource = new CsvTableSource( "/tmp/words", Array("first", "id", "score", "last"), Array( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO ), fieldDelim = "#" ) tEnv.registerTableSource("csv_source", csvTableSource) val resultTable = tEnv.scan("csv_source") .groupBy('first) .select('first, 'score.sum) resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1")) resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2")) println(tEnv.explain(resultTable)) {code} result: == Abstract Syntax Tree == LogicalProject(first=[$0], TMP_1=[$1]) LogicalAggregate(group=[{0}], TMP_0=[SUM($1)]) LogicalProject(first=[$0], score=[$2]) LogicalTableScan(table=[[csv_source]]) == Optimized Logical Plan == DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0]) BatchTableSourceScan(table=[[csv_source]], fields=[first, score]) == Physical Execution Plan == {color:red} Stage 6 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 5 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 4 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 3 : GroupReduce content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 2 : Map content : to: Row(f0: String, f1: Double) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 1 : Map content : Map at emitDataSet(CsvTableSink.scala:67) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 0 : Data Sink content : TextOutputFormat (/tmp/wordcount1) - UTF-8 ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED {color:red} Stage 13 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 12 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 11 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode
[jira] [Created] (FLINK-5857) Recycle idle containers in time for yarn mode
shuai.xu created FLINK-5857: --- Summary: Recycle idle containers in time for yarn mode Key: FLINK-5857 URL: https://issues.apache.org/jira/browse/FLINK-5857 Project: Flink Issue Type: Bug Components: YARN Reporter: shuai.xu Assignee: shuai.xu When we run flink batch job like map reduce, after a map is finished, the container for it may be idle for a long time, we need to have a strategy to recycle there container to reduce resource usage -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5836) Race condition between slot offering and task deployment
[ https://issues.apache.org/jira/browse/FLINK-5836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875234#comment-15875234 ] Biao Liu edited comment on FLINK-5836 at 2/21/17 2:26 AM: -- [~wenlong.lwl] has already been working on it, reassign to him. was (Author: sleepy): [~wenlong.lwl] was already working on this, reassign to him. > Race condition between slot offering and task deployment > > > Key: FLINK-5836 > URL: https://issues.apache.org/jira/browse/FLINK-5836 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Wenlong Lyu > Labels: flip-6 > > The Flip-6 code has a race condition when offering slots to a {{JobManager}} > which directly deploys tasks to the offered slots. In such a situation it is > possible that the deploy call overtakes the acknowledge message for the slot > offering. As a result, the slots are not marked yet as active and the > deployment will fail. > I propose to fix this problem by first activating all offered slots before > sending the slot offer message to the {{JobManager}}. Consequently, we'll > deactivate and free slots which haven't been accepted by the {{JobManager}} > once we've received the offering acknowledge message. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5856) Need return redundant containers to yarn for yarn mode
shuai.xu created FLINK-5856: --- Summary: Need return redundant containers to yarn for yarn mode Key: FLINK-5856 URL: https://issues.apache.org/jira/browse/FLINK-5856 Project: Flink Issue Type: Bug Components: YARN Reporter: shuai.xu Assignee: shuai.xu For flink on yarn mode, RM requests container from yarn according to the requirement of the JM. But the AMRMClientAsync used in yarn doesn't guarantee that the number of containers returned exactly equal to the number requested. So it need to record the number request by flink rm and return the redundant ones to yarn. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5836) Race condition between slot offering and task deployment
[ https://issues.apache.org/jira/browse/FLINK-5836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875234#comment-15875234 ] Biao Liu commented on FLINK-5836: - [~wenlong.lwl] was already working on this, reassign to him. > Race condition between slot offering and task deployment > > > Key: FLINK-5836 > URL: https://issues.apache.org/jira/browse/FLINK-5836 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Biao Liu > Labels: flip-6 > > The Flip-6 code has a race condition when offering slots to a {{JobManager}} > which directly deploys tasks to the offered slots. In such a situation it is > possible that the deploy call overtakes the acknowledge message for the slot > offering. As a result, the slots are not marked yet as active and the > deployment will fail. > I propose to fix this problem by first activating all offered slots before > sending the slot offer message to the {{JobManager}}. Consequently, we'll > deactivate and free slots which haven't been accepted by the {{JobManager}} > once we've received the offering acknowledge message. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5836) Race condition between slot offering and task deployment
[ https://issues.apache.org/jira/browse/FLINK-5836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Biao Liu reassigned FLINK-5836: --- Assignee: Wenlong Lyu (was: Biao Liu) > Race condition between slot offering and task deployment > > > Key: FLINK-5836 > URL: https://issues.apache.org/jira/browse/FLINK-5836 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Wenlong Lyu > Labels: flip-6 > > The Flip-6 code has a race condition when offering slots to a {{JobManager}} > which directly deploys tasks to the offered slots. In such a situation it is > possible that the deploy call overtakes the acknowledge message for the slot > offering. As a result, the slots are not marked yet as active and the > deployment will fail. > I propose to fix this problem by first activating all offered slots before > sending the slot offer message to the {{JobManager}}. Consequently, we'll > deactivate and free slots which haven't been accepted by the {{JobManager}} > once we've received the offering acknowledge message. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5441) Directly allow SQL queries on a Table
[ https://issues.apache.org/jira/browse/FLINK-5441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875232#comment-15875232 ] ASF GitHub Bot commented on FLINK-5441: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3107 Hi @twalthr , the `env.sql(s"SELECT * FROM $table JOIN $otherTable")` is a nice way. But how to handle the table's table name and register it to env ? > Directly allow SQL queries on a Table > - > > Key: FLINK-5441 > URL: https://issues.apache.org/jira/browse/FLINK-5441 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > Right now a user has to register a table before it can be used in SQL > queries. In order to allow more fluent programming we propose calling SQL > directly on a table. An underscore can be used to reference the current table: > {code} > myTable.sql("SELECT a, b, c FROM _ WHERE d = 12") > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3107: [FLINK-5441] [table] Directly allow SQL queries on a Tabl...
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3107 Hi @twalthr , the `env.sql(s"SELECT * FROM $table JOIN $otherTable")` is a nice way. But how to handle the table's table name and register it to env ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5414) Bump up Calcite version to 1.11
[ https://issues.apache.org/jira/browse/FLINK-5414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875207#comment-15875207 ] ASF GitHub Bot commented on FLINK-5414: --- Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3338#discussion_r102117236 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala --- @@ -155,15 +155,15 @@ class ExpressionReductionTest extends TableTestBase { "DataSetCalc", batchTableNode(0), term("select", -"13 AS _c0", +"CAST(13) AS _c0", --- End diff -- Is it possible to not changing the default nullability while adopting Calcite 1.11? Let me try it out as well. > Bump up Calcite version to 1.11 > --- > > Key: FLINK-5414 > URL: https://issues.apache.org/jira/browse/FLINK-5414 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > The upcoming Calcite release 1.11 has a lot of stability fixes and new > features. We should update it for the Table API. > E.g. we can hopefully merge FLINK-4864 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3338: [FLINK-5414] [table] Bump up Calcite version to 1....
Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3338#discussion_r102117236 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala --- @@ -155,15 +155,15 @@ class ExpressionReductionTest extends TableTestBase { "DataSetCalc", batchTableNode(0), term("select", -"13 AS _c0", +"CAST(13) AS _c0", --- End diff -- Is it possible to not changing the default nullability while adopting Calcite 1.11? Let me try it out as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4326) Flink start-up scripts should optionally start services on the foreground
[ https://issues.apache.org/jira/browse/FLINK-4326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875143#comment-15875143 ] Michi Mutsuzaki commented on FLINK-4326: flink-console.sh doesn't define log_setting, and i'm getting a warning like this: {noformat} % ./bin/flink-console.sh jobmanager --configDir conf --executionMode cluster Starting jobmanager as a console application on host x1. log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. {noformat} maybe it makes sense to provide log4j/logback config files that log to console, and define log_setting to point to these config files? > Flink start-up scripts should optionally start services on the foreground > - > > Key: FLINK-4326 > URL: https://issues.apache.org/jira/browse/FLINK-4326 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Affects Versions: 1.0.3 >Reporter: Elias Levy > > This has previously been mentioned in the mailing list, but has not been > addressed. Flink start-up scripts start the job and task managers in the > background. This makes it difficult to integrate Flink with most processes > supervisory tools and init systems, including Docker. One can get around > this via hacking the scripts or manually starting the right classes via Java, > but it is a brittle solution. > In addition to starting the daemons in the foreground, the start up scripts > should use exec instead of running the commends, so as to avoid forks. Many > supervisory tools assume the PID of the process to be monitored is that of > the process it first executes, and fork chains make it difficult for the > supervisor to figure out what process to monitor. Specifically, > jobmanager.sh and taskmanager.sh should exec flink-daemon.sh, and > flink-daemon.sh should exec java. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4422) Convert all time interval measurements to System.nanoTime()
[ https://issues.apache.org/jira/browse/FLINK-4422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875127#comment-15875127 ] Jin Mingjian commented on FLINK-4422: - [~StephanEwen] thanks for re-assigning. Your suggestion is great. PRs will come soon :) > Convert all time interval measurements to System.nanoTime() > --- > > Key: FLINK-4422 > URL: https://issues.apache.org/jira/browse/FLINK-4422 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Jin Mingjian >Priority: Minor > > In contrast to {{System.currentTimeMillis()}}, {{System.nanoTime()}} is > monotonous. To measure delays and time intervals, {{System.nanoTime()}} is > hence reliable, while {{System.currentTimeMillis()}} is not. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5692) Add an Option to Deactivate Kryo Fallback for Serializers
[ https://issues.apache.org/jira/browse/FLINK-5692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jin Mingjian reassigned FLINK-5692: --- Assignee: Jin Mingjian > Add an Option to Deactivate Kryo Fallback for Serializers > - > > Key: FLINK-5692 > URL: https://issues.apache.org/jira/browse/FLINK-5692 > Project: Flink > Issue Type: New Feature > Components: Type Serialization System >Affects Versions: 1.2.0 >Reporter: Stephan Ewen >Assignee: Jin Mingjian > Labels: easyfix, starter > > Some users want to avoid that Flink's serializers use Kryo, as it can easily > become a hotspot in serialization. > For those users, it would help if there is a flag to "deactive generic > types". Those users could then see where types are used that default to Kryo > and change these types (make them PoJos, Value types, or write custom > serializers). > There are two ways to approach that: > 1. (Simple) Make {{GenericTypeInfo}} threw an exception whenever it would > create a Kryo Serializer (when the respective flag is set in the > {{ExecutionConfig}}) > 2. Have a static flag on the {{TypeExtractor}} to throw an exception > whenever it would create a {{GenericTypeInfo}}. This approach has the > downside of introducing some static configuration to the TypeExtractor, but > may be more helpful because it throws exceptions in the programs at points > where the types are used (not where the serializers are created, which may be > much later). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3302: [FLINK-5710] Add ProcTime() function to indicate StreamSQ...
Github user haohui commented on the issue: https://github.com/apache/flink/pull/3302 FYI: #3370 is the commit we use internally for this feature. Please feel free to take it if it helps implementing this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5710) Add ProcTime() function to indicate StreamSQL
[ https://issues.apache.org/jira/browse/FLINK-5710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875061#comment-15875061 ] ASF GitHub Bot commented on FLINK-5710: --- Github user haohui commented on the issue: https://github.com/apache/flink/pull/3302 FYI: #3370 is the commit we use internally for this feature. Please feel free to take it if it helps implementing this PR. > Add ProcTime() function to indicate StreamSQL > - > > Key: FLINK-5710 > URL: https://issues.apache.org/jira/browse/FLINK-5710 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Stefano Bortoli >Assignee: Stefano Bortoli >Priority: Minor > > procTime() is a parameterless scalar function that just indicates processing > time mode -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5710) Add ProcTime() function to indicate StreamSQL
[ https://issues.apache.org/jira/browse/FLINK-5710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875060#comment-15875060 ] ASF GitHub Bot commented on FLINK-5710: --- GitHub user haohui opened a pull request: https://github.com/apache/flink/pull/3370 [FLINK-5710] Add ProcTime() function to indicate StreamSQL. This is the commit we used internally -- There is no unit tests associated with this PR. It simply serves as a reference point for #3302. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-5710 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3370.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3370 commit 7aaa5008c7b49ce48e01d40dc4a04a6211eaf79b Author: Haohui MaiDate: 2017-02-20T21:13:58Z [FLINK-5710] Add ProcTime() function to indicate StreamSQL. > Add ProcTime() function to indicate StreamSQL > - > > Key: FLINK-5710 > URL: https://issues.apache.org/jira/browse/FLINK-5710 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Stefano Bortoli >Assignee: Stefano Bortoli >Priority: Minor > > procTime() is a parameterless scalar function that just indicates processing > time mode -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3370: [FLINK-5710] Add ProcTime() function to indicate S...
GitHub user haohui opened a pull request: https://github.com/apache/flink/pull/3370 [FLINK-5710] Add ProcTime() function to indicate StreamSQL. This is the commit we used internally -- There is no unit tests associated with this PR. It simply serves as a reference point for #3302. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-5710 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3370.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3370 commit 7aaa5008c7b49ce48e01d40dc4a04a6211eaf79b Author: Haohui MaiDate: 2017-02-20T21:13:58Z [FLINK-5710] Add ProcTime() function to indicate StreamSQL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
Ted Yu created FLINK-5855: - Summary: Unprotected access to pendingFilesPerCheckpoint in BucketingSink Key: FLINK-5855 URL: https://issues.apache.org/jira/browse/FLINK-5855 Project: Flink Issue Type: Bug Reporter: Ted Yu Priority: Minor {code} handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); {code} Lock on pendingFilesPerCheckpoint should be obtained prior to the call to handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-4770) Migrate core options
[ https://issues.apache.org/jira/browse/FLINK-4770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-4770: Description: The core options contain everything that is specific to - job - cross TaskManager / JobManager > Migrate core options > > > Key: FLINK-4770 > URL: https://issues.apache.org/jira/browse/FLINK-4770 > Project: Flink > Issue Type: Sub-task > Components: DataSet API, DataStream API >Reporter: Stephan Ewen > > The core options contain everything that is specific to > - job > - cross TaskManager / JobManager -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-4770) Migrate core options
[ https://issues.apache.org/jira/browse/FLINK-4770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-4770: Summary: Migrate core options (was: Migrate Job Execution configuration options) > Migrate core options > > > Key: FLINK-4770 > URL: https://issues.apache.org/jira/browse/FLINK-4770 > Project: Flink > Issue Type: Sub-task > Components: DataSet API, DataStream API >Reporter: Stephan Ewen > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5851) Renaming AsyncCollector into ResultPromise/ResultFuture
[ https://issues.apache.org/jira/browse/FLINK-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874957#comment-15874957 ] Stephan Ewen commented on FLINK-5851: - My preference would be to stay with the {{(Completable)Future}} naming scheme. For some reason, the term {{Promise}} never resonated with me, especially since completing it with an exception seems totally valid, but "breaks" the promise. > Renaming AsyncCollector into ResultPromise/ResultFuture > --- > > Key: FLINK-5851 > URL: https://issues.apache.org/jira/browse/FLINK-5851 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0, 1.3.0 >Reporter: Till Rohrmann > Fix For: 1.3.0 > > > Currently, the async I/O API gives an {{AsyncCollector}} to an > {{AsyncFunction}} implementation. The name does not really reflect what the > {{AsyncCollector}} does since it does not collect but is actually a one time > completable future. Therefore, I propose to rename the {{AsyncCollector}} > into {{ResultPromise}} or {{ResultFuture}}. This is API changing. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-5843) Website/docs missing Cache-Control HTTP header, can serve stale data
[ https://issues.apache.org/jira/browse/FLINK-5843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrick Lucas reassigned FLINK-5843: Assignee: Patrick Lucas Priority: Minor (was: Major) > Website/docs missing Cache-Control HTTP header, can serve stale data > > > Key: FLINK-5843 > URL: https://issues.apache.org/jira/browse/FLINK-5843 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Patrick Lucas >Assignee: Patrick Lucas >Priority: Minor > > When Flink 1.2.0 was released, I found that the [Flink downloads > page|https://flink.apache.org/downloads.html] was out-of-date until I forced > my browser to refresh the page. Upon investigation, I found that the > principle pages of the website are served with only the following headers > that relate to caching: Date, Last-Modified, and ETag. > Since there is no Cache-Control header (or the older Expires or Pragma > headers), browsers are left to their own heuristics as to how long to cache > this content, which varies browser to browser. In some browsers, this > heuristic is 10% of the difference between Date and Last-Modified headers. I > take this to mean that, if the content were last modified 90 days ago, and I > last accessed it 5 days ago, my browser will serve a cached response for a > further 3.5 days (10% * (90 days - 5 days) = 8.5 days, 5 days have elapsed > leaving 3.5 days). > I'm not sure who at the ASF we should talk to about this, but I recommend we > add the following header to any responses served from the Flink project > website or official documentation website\[1]: > {code}Cache-Control: max-age=0, must-revalidate{code} > (Note this will only make browser revalidate their caches; if the ETag of the > cached content matches what the server still has, the server will return 304 > Not Modified and omit the actual content) > \[1] Both the website hosted at flink.apache.org and the documentation hosted > at ci.apache.org are affected. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types
[ https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874935#comment-15874935 ] ASF GitHub Bot commented on FLINK-5854: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3368 It is arguable whether exceptions should ever have a constructor without a message, I simply did that here for convenience. I have no strong feelings about removing the zero argument constructors. > Introduce some Flink-specific base Exception types > -- > > Key: FLINK-5854 > URL: https://issues.apache.org/jira/browse/FLINK-5854 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Going through the code, there are a lot of places where exception handling > could be done a bit nicer, for example > - Some methods do not declare exceptions at all in their signatures. They > simply catch all and wrap it in a {{RuntimeException}}. > - Some places declare overly generic that they throw {{Exception}}, even > though they could very specifically type the exceptions they throw. > I suggest to introduce two new basic exceptions, that at least help document > a bit more what goes wrong: > - {{FlinkException}} as a base class for checked exceptions that indicate > that something related to using Flink went wrong. Letting a method throw > {{FlinkException}} rather than {{Exception}} already helps to not include all > of Java's runtime exceptions, which indicate programming errors, rather than > situations that should be recovered. > - {{FlinkUncheckedException}} as a Flink-specific subclass of > {{RuntimeException}}. That one can come in handy in places where no > exceptions were declared, for example when reusing an interface that does not > declare exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types
[ https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874933#comment-15874933 ] ASF GitHub Bot commented on FLINK-5854: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3368 @zentol There are many places in the runtime that declare `throws Exception`, for example virtually all of the state handling code. This always came from the desire to throw `IOException` plus something that expresses that non-I/O stuff related to Flink went wrong. The result was a `throws Exception`, which also means that you have to catch `Exception` which you often don't want (because this included `RuntimeException` and you typically want runtime exception to bubble up a bit further, since they denote bugs by encouraged design). The only place where `throws Exception` really makes sense to me is for `MapFunction` and the likes, to allow them to propagate any type of exception and let recovery handle them. > Introduce some Flink-specific base Exception types > -- > > Key: FLINK-5854 > URL: https://issues.apache.org/jira/browse/FLINK-5854 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Going through the code, there are a lot of places where exception handling > could be done a bit nicer, for example > - Some methods do not declare exceptions at all in their signatures. They > simply catch all and wrap it in a {{RuntimeException}}. > - Some places declare overly generic that they throw {{Exception}}, even > though they could very specifically type the exceptions they throw. > I suggest to introduce two new basic exceptions, that at least help document > a bit more what goes wrong: > - {{FlinkException}} as a base class for checked exceptions that indicate > that something related to using Flink went wrong. Letting a method throw > {{FlinkException}} rather than {{Exception}} already helps to not include all > of Java's runtime exceptions, which indicate programming errors, rather than > situations that should be recovered. > - {{FlinkUncheckedException}} as a Flink-specific subclass of > {{RuntimeException}}. That one can come in handy in places where no > exceptions were declared, for example when reusing an interface that does not > declare exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3368: [FLINK-5854] [core] Add base Flink Exception classes
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3368 It is arguable whether exceptions should ever have a constructor without a message, I simply did that here for convenience. I have no strong feelings about removing the zero argument constructors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3368: [FLINK-5854] [core] Add base Flink Exception classes
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3368 @zentol There are many places in the runtime that declare `throws Exception`, for example virtually all of the state handling code. This always came from the desire to throw `IOException` plus something that expresses that non-I/O stuff related to Flink went wrong. The result was a `throws Exception`, which also means that you have to catch `Exception` which you often don't want (because this included `RuntimeException` and you typically want runtime exception to bubble up a bit further, since they denote bugs by encouraged design). The only place where `throws Exception` really makes sense to me is for `MapFunction` and the likes, to allow them to propagate any type of exception and let recovery handle them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3368: [FLINK-5854] [core] Add base Flink Exception classes
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3368 Could you name 1 or 2 examples for situations where you think it is appropriate to throw a ```FlinkException```? Would invalid arguments (like a String being null) be a reason to do so? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types
[ https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874926#comment-15874926 ] ASF GitHub Bot commented on FLINK-5854: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3368 Could you name 1 or 2 examples for situations where you think it is appropriate to throw a ```FlinkException```? Would invalid arguments (like a String being null) be a reason to do so? > Introduce some Flink-specific base Exception types > -- > > Key: FLINK-5854 > URL: https://issues.apache.org/jira/browse/FLINK-5854 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Going through the code, there are a lot of places where exception handling > could be done a bit nicer, for example > - Some methods do not declare exceptions at all in their signatures. They > simply catch all and wrap it in a {{RuntimeException}}. > - Some places declare overly generic that they throw {{Exception}}, even > though they could very specifically type the exceptions they throw. > I suggest to introduce two new basic exceptions, that at least help document > a bit more what goes wrong: > - {{FlinkException}} as a base class for checked exceptions that indicate > that something related to using Flink went wrong. Letting a method throw > {{FlinkException}} rather than {{Exception}} already helps to not include all > of Java's runtime exceptions, which indicate programming errors, rather than > situations that should be recovered. > - {{FlinkUncheckedException}} as a Flink-specific subclass of > {{RuntimeException}}. That one can come in handy in places where no > exceptions were declared, for example when reusing an interface that does not > declare exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3368: [FLINK-5854] [core] Add base Flink Exception class...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3368#discussion_r102072659 --- Diff: flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.annotation.Public; + +/** + * Base class of all Flink-specific unchecked exceptions. + */ +@Public +public class FlinkRuntimeException extends RuntimeException { + + private static final long serialVersionUID = 193141189399279147L; + + /** +* Creates a new exception with a null message and null cause. +*/ + public FlinkRuntimeException() { --- End diff -- Is there a reasonable use-case for an exception without an error message or cause? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types
[ https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874919#comment-15874919 ] ASF GitHub Bot commented on FLINK-5854: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3368#discussion_r102072659 --- Diff: flink-core/src/main/java/org/apache/flink/util/FlinkRuntimeException.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.annotation.Public; + +/** + * Base class of all Flink-specific unchecked exceptions. + */ +@Public +public class FlinkRuntimeException extends RuntimeException { + + private static final long serialVersionUID = 193141189399279147L; + + /** +* Creates a new exception with a null message and null cause. +*/ + public FlinkRuntimeException() { --- End diff -- Is there a reasonable use-case for an exception without an error message or cause? > Introduce some Flink-specific base Exception types > -- > > Key: FLINK-5854 > URL: https://issues.apache.org/jira/browse/FLINK-5854 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Going through the code, there are a lot of places where exception handling > could be done a bit nicer, for example > - Some methods do not declare exceptions at all in their signatures. They > simply catch all and wrap it in a {{RuntimeException}}. > - Some places declare overly generic that they throw {{Exception}}, even > though they could very specifically type the exceptions they throw. > I suggest to introduce two new basic exceptions, that at least help document > a bit more what goes wrong: > - {{FlinkException}} as a base class for checked exceptions that indicate > that something related to using Flink went wrong. Letting a method throw > {{FlinkException}} rather than {{Exception}} already helps to not include all > of Java's runtime exceptions, which indicate programming errors, rather than > situations that should be recovered. > - {{FlinkUncheckedException}} as a Flink-specific subclass of > {{RuntimeException}}. That one can come in handy in places where no > exceptions were declared, for example when reusing an interface that does not > declare exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3368: [FLINK-5854] [core] Add base Flink Exception class...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3368#discussion_r102072425 --- Diff: flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.annotation.Public; + +/** + * An exception that is thrown if the dynamic instantiation of code fails. + * + * This exception is supposed to "sum up" the zoo of exceptions typically thrown around + * dynamic code loading and instantiations: + * + * {@code + * try { + * Class.forName(classname).asSubclass(TheType.class).newInstance(); + * } + * catch (ClassNotFoundException | ClassCastException | InstantiationException | IllegalAccessException e) { + * throw new DynamicCodeLoadingException(e); + * } + * } + */ +@Public +public class DynamicCodeLoadingException extends FlinkException { + + private static final long serialVersionUID = -25138443817255490L; + + /** +* Creates a new exception with the given message and cause +* +* @param message The exception message +* @param cause The exception that caused this exception +*/ + public DynamicCodeLoadingException(String message, Throwable cause) { --- End diff -- Would it make sense to make this constructor more explicit in the type of exceptions it accepts? (i.e. one constructor each for the exceptions that are typically thrown in situations that we want to cover) It's probably just be bloat, but maybe it would prevent misuse of this exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types
[ https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874916#comment-15874916 ] ASF GitHub Bot commented on FLINK-5854: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3368#discussion_r102072425 --- Diff: flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.annotation.Public; + +/** + * An exception that is thrown if the dynamic instantiation of code fails. + * + * This exception is supposed to "sum up" the zoo of exceptions typically thrown around + * dynamic code loading and instantiations: + * + * {@code + * try { + * Class.forName(classname).asSubclass(TheType.class).newInstance(); + * } + * catch (ClassNotFoundException | ClassCastException | InstantiationException | IllegalAccessException e) { + * throw new DynamicCodeLoadingException(e); + * } + * } + */ +@Public +public class DynamicCodeLoadingException extends FlinkException { + + private static final long serialVersionUID = -25138443817255490L; + + /** +* Creates a new exception with the given message and cause +* +* @param message The exception message +* @param cause The exception that caused this exception +*/ + public DynamicCodeLoadingException(String message, Throwable cause) { --- End diff -- Would it make sense to make this constructor more explicit in the type of exceptions it accepts? (i.e. one constructor each for the exceptions that are typically thrown in situations that we want to cover) It's probably just be bloat, but maybe it would prevent misuse of this exception. > Introduce some Flink-specific base Exception types > -- > > Key: FLINK-5854 > URL: https://issues.apache.org/jira/browse/FLINK-5854 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Going through the code, there are a lot of places where exception handling > could be done a bit nicer, for example > - Some methods do not declare exceptions at all in their signatures. They > simply catch all and wrap it in a {{RuntimeException}}. > - Some places declare overly generic that they throw {{Exception}}, even > though they could very specifically type the exceptions they throw. > I suggest to introduce two new basic exceptions, that at least help document > a bit more what goes wrong: > - {{FlinkException}} as a base class for checked exceptions that indicate > that something related to using Flink went wrong. Letting a method throw > {{FlinkException}} rather than {{Exception}} already helps to not include all > of Java's runtime exceptions, which indicate programming errors, rather than > situations that should be recovered. > - {{FlinkUncheckedException}} as a Flink-specific subclass of > {{RuntimeException}}. That one can come in handy in places where no > exceptions were declared, for example when reusing an interface that does not > declare exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3368: [FLINK-5854] [core] Add base Flink Exception class...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3368#discussion_r102071979 --- Diff: flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.annotation.Public; + +/** + * An exception that is thrown if the dynamic instantiation of code fails. + * + * This exception is supposed to "sum up" the zoo of exceptions typically thrown around + * dynamic code loading and instantiations: + * + * {@code + * try { + * Class.forName(classname).asSubclass(TheType.class).newInstance(); + * } + * catch (ClassNotFoundException | ClassCastException | InstantiationException | IllegalAccessException e) { + * throw new DynamicCodeLoadingException(e); --- End diff -- there is no constructor that matches this line of the javadoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types
[ https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874911#comment-15874911 ] ASF GitHub Bot commented on FLINK-5854: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3368#discussion_r102071979 --- Diff: flink-core/src/main/java/org/apache/flink/util/DynamicCodeLoadingException.java --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.annotation.Public; + +/** + * An exception that is thrown if the dynamic instantiation of code fails. + * + * This exception is supposed to "sum up" the zoo of exceptions typically thrown around + * dynamic code loading and instantiations: + * + * {@code + * try { + * Class.forName(classname).asSubclass(TheType.class).newInstance(); + * } + * catch (ClassNotFoundException | ClassCastException | InstantiationException | IllegalAccessException e) { + * throw new DynamicCodeLoadingException(e); --- End diff -- there is no constructor that matches this line of the javadoc. > Introduce some Flink-specific base Exception types > -- > > Key: FLINK-5854 > URL: https://issues.apache.org/jira/browse/FLINK-5854 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Going through the code, there are a lot of places where exception handling > could be done a bit nicer, for example > - Some methods do not declare exceptions at all in their signatures. They > simply catch all and wrap it in a {{RuntimeException}}. > - Some places declare overly generic that they throw {{Exception}}, even > though they could very specifically type the exceptions they throw. > I suggest to introduce two new basic exceptions, that at least help document > a bit more what goes wrong: > - {{FlinkException}} as a base class for checked exceptions that indicate > that something related to using Flink went wrong. Letting a method throw > {{FlinkException}} rather than {{Exception}} already helps to not include all > of Java's runtime exceptions, which indicate programming errors, rather than > situations that should be recovered. > - {{FlinkUncheckedException}} as a Flink-specific subclass of > {{RuntimeException}}. That one can come in handy in places where no > exceptions were declared, for example when reusing an interface that does not > declare exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #2903: [FLINK-5074] [runtime] add a zookeeper based running job ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2903 I think this looks good, thanks! Merging this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5074) Implement a RunningJobRegistry based on Zookeeper
[ https://issues.apache.org/jira/browse/FLINK-5074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874889#comment-15874889 ] ASF GitHub Bot commented on FLINK-5074: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2903 I think this looks good, thanks! Merging this... > Implement a RunningJobRegistry based on Zookeeper > -- > > Key: FLINK-5074 > URL: https://issues.apache.org/jira/browse/FLINK-5074 > Project: Flink > Issue Type: Task > Components: Cluster Management >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > For flip-6, it has implemented the ZookeeperHaServices, but > ZookeeperHaServices does not support getRunningJobsRegistry. So need to > implement a ZK based running job registry. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3369: [FLINK-5831] [webui] order, search and filter metr...
GitHub user nellboy opened a pull request: https://github.com/apache/flink/pull/3369 [FLINK-5831] [webui] order, search and filter metrics Metrics are now ordered, and searchable when displaying which metrics graphs to display in the web ui. ![screen shot 2017-02-20 at 16 09 58](https://cloud.githubusercontent.com/assets/39847/23136003/fc02da1a-f79a-11e6-90c0-8da1742d42c1.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/nellboy/flink webui/metrics-filtering Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3369.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3369 commit 730e7b6d3b35855d93049e48850ca5919f7db968 Author: paulDate: 2017-02-20T09:28:14Z [FLINK-5831] [webui] sort metrics by id commit 45f5c565d9a805f78c40ed8ad7cc497aaf5c438f Author: paul Date: 2017-02-20T17:27:13Z [FLINK-5831] [webui] order, search and filter metrics --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5831) Sort metrics in metric selector and add search box
[ https://issues.apache.org/jira/browse/FLINK-5831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874853#comment-15874853 ] ASF GitHub Bot commented on FLINK-5831: --- GitHub user nellboy opened a pull request: https://github.com/apache/flink/pull/3369 [FLINK-5831] [webui] order, search and filter metrics Metrics are now ordered, and searchable when displaying which metrics graphs to display in the web ui. ![screen shot 2017-02-20 at 16 09 58](https://cloud.githubusercontent.com/assets/39847/23136003/fc02da1a-f79a-11e6-90c0-8da1742d42c1.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/nellboy/flink webui/metrics-filtering Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3369.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3369 commit 730e7b6d3b35855d93049e48850ca5919f7db968 Author: paulDate: 2017-02-20T09:28:14Z [FLINK-5831] [webui] sort metrics by id commit 45f5c565d9a805f78c40ed8ad7cc497aaf5c438f Author: paul Date: 2017-02-20T17:27:13Z [FLINK-5831] [webui] order, search and filter metrics > Sort metrics in metric selector and add search box > -- > > Key: FLINK-5831 > URL: https://issues.apache.org/jira/browse/FLINK-5831 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger > Attachments: dropDown.png > > > The JobManager UI makes it hard to select metrics using the drop down menu. > First of all, it would me nice to sort all entries. Also a search box on top > of the drop down would make it much easier to find the metrics. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3368: [FLINK-5854] [core] Add base Flink Exception class...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3368 [FLINK-5854] [core] Add base Flink Exception classes This pull request adds two exception base classes: `FlinkException` and `FlinkRuntimeException`. They are useful in improving the way certain parts of the code handle exceptions. - `FlinkException` is a base class for checked exceptions that indicate that something related to using Flink went wrong. It is helpful, because letting a method throw `FlinkException` rather than `Exception` already helps to not include all of Java's runtime exceptions, which indicate programming errors, rather than situations that should be recovered. - `FlinkRuntimeException` as a Flink-specific subclass of `RuntimeException` comes in handy in places where no exceptions were declared, for example when reusing an interface that does not declare exceptions. **Important: This does not mean we should just declare `FlinkException` everywhere and throw and catch `FlinkException` and `FlinkRuntimeException` arbitrarily. Exception handling remains a careful and conscious task.** This also adds the `DynamicCodeLoadingException` subclass of `FlinkException` as an example. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink exceptions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3368.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3368 commit 1bed2d20a5ccfae4ae7bdfadaaf03fcbe1dba449 Author: Stephan EwenDate: 2017-02-17T15:24:35Z [FLINK-] [core] Add base Flink Exception classes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5854) Introduce some Flink-specific base Exception types
[ https://issues.apache.org/jira/browse/FLINK-5854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874852#comment-15874852 ] ASF GitHub Bot commented on FLINK-5854: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3368 [FLINK-5854] [core] Add base Flink Exception classes This pull request adds two exception base classes: `FlinkException` and `FlinkRuntimeException`. They are useful in improving the way certain parts of the code handle exceptions. - `FlinkException` is a base class for checked exceptions that indicate that something related to using Flink went wrong. It is helpful, because letting a method throw `FlinkException` rather than `Exception` already helps to not include all of Java's runtime exceptions, which indicate programming errors, rather than situations that should be recovered. - `FlinkRuntimeException` as a Flink-specific subclass of `RuntimeException` comes in handy in places where no exceptions were declared, for example when reusing an interface that does not declare exceptions. **Important: This does not mean we should just declare `FlinkException` everywhere and throw and catch `FlinkException` and `FlinkRuntimeException` arbitrarily. Exception handling remains a careful and conscious task.** This also adds the `DynamicCodeLoadingException` subclass of `FlinkException` as an example. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink exceptions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3368.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3368 commit 1bed2d20a5ccfae4ae7bdfadaaf03fcbe1dba449 Author: Stephan EwenDate: 2017-02-17T15:24:35Z [FLINK-] [core] Add base Flink Exception classes > Introduce some Flink-specific base Exception types > -- > > Key: FLINK-5854 > URL: https://issues.apache.org/jira/browse/FLINK-5854 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Going through the code, there are a lot of places where exception handling > could be done a bit nicer, for example > - Some methods do not declare exceptions at all in their signatures. They > simply catch all and wrap it in a {{RuntimeException}}. > - Some places declare overly generic that they throw {{Exception}}, even > though they could very specifically type the exceptions they throw. > I suggest to introduce two new basic exceptions, that at least help document > a bit more what goes wrong: > - {{FlinkException}} as a base class for checked exceptions that indicate > that something related to using Flink went wrong. Letting a method throw > {{FlinkException}} rather than {{Exception}} already helps to not include all > of Java's runtime exceptions, which indicate programming errors, rather than > situations that should be recovered. > - {{FlinkUncheckedException}} as a Flink-specific subclass of > {{RuntimeException}}. That one can come in handy in places where no > exceptions were declared, for example when reusing an interface that does not > declare exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5831) Sort metrics in metric selector and add search box
[ https://issues.apache.org/jira/browse/FLINK-5831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874850#comment-15874850 ] ASF GitHub Bot commented on FLINK-5831: --- Github user nellboy closed the pull request at: https://github.com/apache/flink/pull/3362 > Sort metrics in metric selector and add search box > -- > > Key: FLINK-5831 > URL: https://issues.apache.org/jira/browse/FLINK-5831 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger > Attachments: dropDown.png > > > The JobManager UI makes it hard to select metrics using the drop down menu. > First of all, it would me nice to sort all entries. Also a search box on top > of the drop down would make it much easier to find the metrics. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3362: [FLINK-5831] [webui] order, search and filter metr...
Github user nellboy closed the pull request at: https://github.com/apache/flink/pull/3362 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-5854) Introduce some Flink-specific base Exception types
Stephan Ewen created FLINK-5854: --- Summary: Introduce some Flink-specific base Exception types Key: FLINK-5854 URL: https://issues.apache.org/jira/browse/FLINK-5854 Project: Flink Issue Type: Improvement Components: Core Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.3.0 Going through the code, there are a lot of places where exception handling could be done a bit nicer, for example - Some methods do not declare exceptions at all in their signatures. They simply catch all and wrap it in a {{RuntimeException}}. - Some places declare overly generic that they throw {{Exception}}, even though they could very specifically type the exceptions they throw. I suggest to introduce two new basic exceptions, that at least help document a bit more what goes wrong: - {{FlinkException}} as a base class for checked exceptions that indicate that something related to using Flink went wrong. Letting a method throw {{FlinkException}} rather than {{Exception}} already helps to not include all of Java's runtime exceptions, which indicate programming errors, rather than situations that should be recovered. - {{FlinkUncheckedException}} as a Flink-specific subclass of {{RuntimeException}}. That one can come in handy in places where no exceptions were declared, for example when reusing an interface that does not declare exceptions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5819) Improve metrics reporting
[ https://issues.apache.org/jira/browse/FLINK-5819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874840#comment-15874840 ] ASF GitHub Bot commented on FLINK-5819: --- GitHub user nellboy opened a pull request: https://github.com/apache/flink/pull/3367 [FLINK-5819] [webui] implements numeric option on metrics graphs [FLINK-5819] [webui] implements numeric option on metrics graphs ![pasted image at 2017_02_17 13_56](https://cloud.githubusercontent.com/assets/39847/23135607/937a75da-f799-11e6-9dbb-6e0c877f615b.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/nellboy/flink webui/numeric-metrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3367.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3367 commit 3d05e2dfdf0bef516cfdc0e298e37e2583a8187c Author: paulDate: 2017-02-20T17:20:45Z [FLINK-5819] [webui] implements numeric option on metrics graphs > Improve metrics reporting > - > > Key: FLINK-5819 > URL: https://issues.apache.org/jira/browse/FLINK-5819 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.3.0 >Reporter: Paul Nelligan > Labels: web-ui > Original Estimate: 48h > Remaining Estimate: 48h > > When displaying individual metrics for a vertex / node of a job in the webui, > it is desirable to add an option to display metrics as a numeric or as a > chart. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3367: [FLINK-5819] [webui] implements numeric option on ...
GitHub user nellboy opened a pull request: https://github.com/apache/flink/pull/3367 [FLINK-5819] [webui] implements numeric option on metrics graphs [FLINK-5819] [webui] implements numeric option on metrics graphs ![pasted image at 2017_02_17 13_56](https://cloud.githubusercontent.com/assets/39847/23135607/937a75da-f799-11e6-9dbb-6e0c877f615b.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/nellboy/flink webui/numeric-metrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3367.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3367 commit 3d05e2dfdf0bef516cfdc0e298e37e2583a8187c Author: paulDate: 2017-02-20T17:20:45Z [FLINK-5819] [webui] implements numeric option on metrics graphs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5819) Improve metrics reporting
[ https://issues.apache.org/jira/browse/FLINK-5819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874838#comment-15874838 ] ASF GitHub Bot commented on FLINK-5819: --- Github user nellboy closed the pull request at: https://github.com/apache/flink/pull/3361 > Improve metrics reporting > - > > Key: FLINK-5819 > URL: https://issues.apache.org/jira/browse/FLINK-5819 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.3.0 >Reporter: Paul Nelligan > Labels: web-ui > Original Estimate: 48h > Remaining Estimate: 48h > > When displaying individual metrics for a vertex / node of a job in the webui, > it is desirable to add an option to display metrics as a numeric or as a > chart. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3361: [FLINK-5819] [webui] implements numeric option on ...
Github user nellboy closed the pull request at: https://github.com/apache/flink/pull/3361 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3333: Webui/watermarks
Github user nellboy closed the pull request at: https://github.com/apache/flink/pull/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3366: Webui/watermarks tab
GitHub user nellboy opened a pull request: https://github.com/apache/flink/pull/3366 Webui/watermarks tab implement watermarks tab and display low watermarks on graph nodes. ![screen shot 2017-02-20 at 17 54 51](https://cloud.githubusercontent.com/assets/39847/23135205/fb8419f8-f797-11e6-8ee3-24bd50296a77.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/nellboy/flink webui/watermarks-tab Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3366.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3366 commit 25874c6c60889d0e9dd5c491a6612957763f338f Author: paulDate: 2017-02-16T10:25:45Z [FLINK-3427] [webui] implements watermarks tab commit 0bc569635b88cd69d72f8b792862c03570dcdced Author: paul Date: 2017-02-16T10:57:26Z [FLINK-3427] [webui] display watermarks index only instead of full id commit 90f727ca1535e2d0be5302efe5b8a834d29f901c Author: paul Date: 2017-02-20T09:54:12Z [FLINK-3427] [webui] replace watermarks parsing in controller with filters commit 9fb2d4205b1e59ffe5d538fee5092abb0354ee4c Author: paul Date: 2017-02-20T09:58:35Z [FLINK-3427] [webui] set watermarks to null on node change commit 93a7a61c8b5852dbc3c86336b247d7e21421b662 Author: paul Date: 2017-02-20T17:08:45Z [FLINK-3427] [webui] display low watermarks on node graph --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5845) CEP: unify key and non-keyed operators
[ https://issues.apache.org/jira/browse/FLINK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-5845: -- Description: Currently the keyed and non-keyed operators in the CEP library have different implementations. This issue targets to unify them into one. This new implementation will always be applied on a keyed stream, and in the case of non-keyed usecases, the input stream will be keyed on a dummy key, as done in the case of the {{DataStream.windowAll()}} method, where the input stream is keyed using the {{NullByteKeySelector}}. This is a first step towards making the CEP operators rescalable. was: Currently the keyed and non-keyed operators in the CEP library have different implementations. This issue targets to unify them into one. This new implementation will always be applied on a keyed stream, and in the case of non-keyed usecases, the input stream will be keyed on a dummy keye, as done in the case of the {{DataStream.windowAll()}} method, where the input stream is keyed using the {{NullByteKeySelector}}. This is a first step towards making the CEP operators rescalable. > CEP: unify key and non-keyed operators > -- > > Key: FLINK-5845 > URL: https://issues.apache.org/jira/browse/FLINK-5845 > Project: Flink > Issue Type: Sub-task > Components: CEP >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > Currently the keyed and non-keyed operators in the CEP library have different > implementations. This issue targets to unify them into one. > This new implementation will always be applied on a keyed stream, and in the > case of non-keyed usecases, the input stream will be keyed on a dummy key, as > done in the case of the {{DataStream.windowAll()}} method, where the input > stream is keyed using the {{NullByteKeySelector}}. > This is a first step towards making the CEP operators rescalable. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874820#comment-15874820 ] ASF GitHub Bot commented on FLINK-4520: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2487 @haoch What do you think about Robert's suggestion to move this to Bahir? Seems like a reasonable first step to me. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4499) Introduce findbugs maven plugin
[ https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874818#comment-15874818 ] ASF GitHub Bot commented on FLINK-4499: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2422 I think that the increased build times are a blocker for executing this with every CI run. Maybe this could become something that we execute nightly instead? Not ideal, but better than nothing. > Introduce findbugs maven plugin > --- > > Key: FLINK-4499 > URL: https://issues.apache.org/jira/browse/FLINK-4499 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Assignee: Suneel Marthi > > As suggested by Stephan in FLINK-4482, this issue is to add > findbugs-maven-plugin into the build process so that we can detect lack of > proper locking and other defects automatically. > We can begin with small set of rules. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-we...
Github user uce commented on the issue: https://github.com/apache/flink/pull/2487 @haoch What do you think about Robert's suggestion to move this to Bahir? Seems like a reasonable first step to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---