[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers
[ https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463386#comment-16463386 ] ASF GitHub Bot commented on FLINK-9190: --- Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/5931 @GJL In blink, we solve this problem like this. When a container complete, we will first whether the container has registered yet, if it has registered before, RM will not request a new container, as the job master will ask for it when failover. If it has not registered, RM will request a new one. > YarnResourceManager sometimes does not request new Containers > - > > Key: FLINK-9190 > URL: https://issues.apache.org/jira/browse/FLINK-9190 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 > Environment: Hadoop 2.8.3 > ZooKeeper 3.4.5 > Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > Attachments: yarn-logs > > > *Description* > The {{YarnResourceManager}} does not request new containers if > {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is > restarted due to {{NoResourceAvailableException}}, and the job runs normally > afterwards. I suspect that {{TaskManager}} failures are not registered if the > failure occurs before the {{TaskManager}} registers with the master. Logs are > attached; I added additional log statements to > {{YarnResourceManager.onContainersCompleted}} and > {{YarnResourceManager.onContainersAllocated}}. > *Expected Behavior* > The {{YarnResourceManager}} should recognize that the container is completed > and keep requesting new containers. The job should run as soon as resources > are available. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/5931 @GJL In blink, we solve this problem like this. When a container complete, we will first whether the container has registered yet, if it has registered before, RM will not request a new container, as the job master will ask for it when failover. If it has not registered, RM will request a new one. ---
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463384#comment-16463384 ] ASF GitHub Bot commented on FLINK-9169: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5950 I think even with the `UnloadableTypeSerializerException` exception bubbling approach, we actually still need a flag in the serialization proxy to decide how to handle the exception. The serialization proxy handles deserialization of all meta data of all registered key states, so that would be the highest level where we need to decide whether or not to use the dummy serializer. If we want to hand out this control to an even higher level (i.e. the backend), we would then need to break up the deserialization logic from the serialization proxy, which IMO isn't appropriate. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5950 I think even with the `UnloadableTypeSerializerException` exception bubbling approach, we actually still need a flag in the serialization proxy to decide how to handle the exception. The serialization proxy handles deserialization of all meta data of all registered key states, so that would be the highest level where we need to decide whether or not to use the dummy serializer. If we want to hand out this control to an even higher level (i.e. the backend), we would then need to break up the deserialization logic from the serialization proxy, which IMO isn't appropriate. ---
[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5950 @StefanRRichter yes, now that you mentioned it, the `isSerializerPresenceRequiredFlag` does seem a bit awkward to be in the serialization proxy. Essentially, what it is only doing is serving as a switch to decide whether or not to fail - something that could be done by the caller. I'll quickly try your suggested approach and see how that turns out. ---
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463363#comment-16463363 ] ASF GitHub Bot commented on FLINK-9169: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5950 @StefanRRichter yes, now that you mentioned it, the `isSerializerPresenceRequiredFlag` does seem a bit awkward to be in the serialization proxy. Essentially, what it is only doing is serving as a switch to decide whether or not to fail - something that could be done by the caller. I'll quickly try your suggested approach and see how that turns out. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9294: - Description: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public List eval(MapmapArg) { //... } {code} should automatically resolve that: - *{{ObjectArrayTypeInfo}}* to be the result type. - *{{MapTypeInfo }}* to be the parameter type. was: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public List eval(Map mapArg) { //... } {code} should automatically resolve *{{ObjectArrayTypeInfo}}* & *{{MapTypeInfo }}* to be the result type and parameter type. > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > *{{MAP}}*, *{{ARRAY}}*, etc would require user to override > *{{getParameterType}}* or *{{getResultType}}* method explicitly. > It should be able to resolve the composite type based on the function > signature, such as: > {code:java} > public List eval(Map mapArg) { > //... > } > {code} > should automatically resolve that: > - *{{ObjectArrayTypeInfo}}* to be the result type. > - *{{MapTypeInfo }}* to be the > parameter type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9294: - Description: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public List eval(MapmapArg) { //... } {code} should automatically resolve *{{ObjectArrayTypeInfo}}* & *{{MapTypeInfo }}* to be the result type and parameter type. was: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature. > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > *{{MAP}}*, *{{ARRAY}}*, etc would require user to override > *{{getParameterType}}* or *{{getResultType}}* method explicitly. > It should be able to resolve the composite type based on the function > signature, such as: > {code:java} > public List eval(Map mapArg) { > //... > } > {code} > should automatically resolve *{{ObjectArrayTypeInfo}}* > & *{{MapTypeInfo }}* to be the > result type and parameter type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-7795) Utilize error-prone to discover common coding mistakes
[ https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345955#comment-16345955 ] Ted Yu edited comment on FLINK-7795 at 5/3/18 11:40 PM: error-prone has JDK 8 dependency . was (Author: yuzhih...@gmail.com): error-prone has JDK 8 dependency. > Utilize error-prone to discover common coding mistakes > -- > > Key: FLINK-7795 > URL: https://issues.apache.org/jira/browse/FLINK-7795 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Priority: Major > > http://errorprone.info/ is a tool which detects common coding mistakes. > We should incorporate into Flink build process. > Here are the dependencies: > {code} > > com.google.errorprone > error_prone_annotation > ${error-prone.version} > provided > > > > com.google.auto.service > auto-service > 1.0-rc3 > true > > > com.google.errorprone > error_prone_check_api > ${error-prone.version} > provided > > > com.google.code.findbugs > jsr305 > > > > > com.google.errorprone > javac > 9-dev-r4023-3 > provided > > > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8554) Upgrade AWS SDK
[ https://issues.apache.org/jira/browse/FLINK-8554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-8554: --- Assignee: vinoyang > Upgrade AWS SDK > --- > > Key: FLINK-8554 > URL: https://issues.apache.org/jira/browse/FLINK-8554 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Minor > > AWS SDK 1.11.271 fixes a lot of bugs. > One of which would exhibit the following: > {code} > Caused by: java.lang.NullPointerException > at com.amazonaws.metrics.AwsSdkMetrics.getRegion(AwsSdkMetrics.java:729) > at com.amazonaws.metrics.MetricAdmin.getRegion(MetricAdmin.java:67) > at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-6719) Add details about fault-tolerance of timers to ProcessFunction docs
[ https://issues.apache.org/jira/browse/FLINK-6719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-6719. Resolution: Fixed Fix Version/s: (was: 1.5.1) (was: 1.6.0) 1.4.3 1.5.0 Fixed for 1.6.0 with 2cfd89c845ce2341c94a88e5cea7a4c86419b25f Fixed for 1.5.0 with 9435cd4fe54fd8600d661175dcf00d6b4464e200 Fixed for 1.4.3 with b84cdda4d5fa4ded807c13138f5742e379be453b > Add details about fault-tolerance of timers to ProcessFunction docs > --- > > Key: FLINK-6719 > URL: https://issues.apache.org/jira/browse/FLINK-6719 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Documentation >Affects Versions: 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0, 1.4.3 > > > The fault-tolerance of timers is a frequently asked questions on the mailing > lists. We should add details about the topic in the {{ProcessFunction}} docs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6719) Add details about fault-tolerance of timers to ProcessFunction docs
[ https://issues.apache.org/jira/browse/FLINK-6719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463147#comment-16463147 ] ASF GitHub Bot commented on FLINK-6719: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5887 > Add details about fault-tolerance of timers to ProcessFunction docs > --- > > Key: FLINK-6719 > URL: https://issues.apache.org/jira/browse/FLINK-6719 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Documentation >Affects Versions: 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Bowen Li >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > The fault-tolerance of timers is a frequently asked questions on the mailing > lists. We should add details about the topic in the {{ProcessFunction}} docs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5887: [FLINK-6719] [docs] Add details about fault-tolera...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5887 ---
[jira] [Resolved] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id
[ https://issues.apache.org/jira/browse/FLINK-9293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-9293. - Resolution: Fixed Fixed in - 1.5.0 via df1eda8646a769b419388db2cf699cc53b009849 - 1.6.0 via bbaf82ebe245d4758e73aa928d79a3708c816311 > SlotPool should check slot id when accepting a slot offer with existing > allocation id > - > > Key: FLINK-9293 > URL: https://issues.apache.org/jira/browse/FLINK-9293 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > For flip-6, there may be two or more slot assigned to the same slot > allocation. For example, taskExecutor1 register, and assign allocationID1 to > its slot1, but from taskExecutor1 side, the registeration timeout, and it > register again, RM will fail the allocationID1 and assign slot2 on > taskExecutor2 to it. but taskExecutor1 has already accept the allocationID1. > So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the > allocationID1. Now slot pool just accept all the slot offer, and this may one > slot leak. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id
[ https://issues.apache.org/jira/browse/FLINK-9293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-9293. --- > SlotPool should check slot id when accepting a slot offer with existing > allocation id > - > > Key: FLINK-9293 > URL: https://issues.apache.org/jira/browse/FLINK-9293 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > For flip-6, there may be two or more slot assigned to the same slot > allocation. For example, taskExecutor1 register, and assign allocationID1 to > its slot1, but from taskExecutor1 side, the registeration timeout, and it > register again, RM will fail the allocationID1 and assign slot2 on > taskExecutor2 to it. but taskExecutor1 has already accept the allocationID1. > So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the > allocationID1. Now slot pool just accept all the slot offer, and this may one > slot leak. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id
[ https://issues.apache.org/jira/browse/FLINK-9293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463122#comment-16463122 ] ASF GitHub Bot commented on FLINK-9293: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5951 > SlotPool should check slot id when accepting a slot offer with existing > allocation id > - > > Key: FLINK-9293 > URL: https://issues.apache.org/jira/browse/FLINK-9293 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > For flip-6, there may be two or more slot assigned to the same slot > allocation. For example, taskExecutor1 register, and assign allocationID1 to > its slot1, but from taskExecutor1 side, the registeration timeout, and it > register again, RM will fail the allocationID1 and assign slot2 on > taskExecutor2 to it. but taskExecutor1 has already accept the allocationID1. > So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the > allocationID1. Now slot pool just accept all the slot offer, and this may one > slot leak. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5951: [FLINK-9293] [runtime] SlotPool should check slot ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5951 ---
[jira] [Commented] (FLINK-9141) Calling getSideOutput() and split() on one DataStream causes NPE
[ https://issues.apache.org/jira/browse/FLINK-9141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463093#comment-16463093 ] ASF GitHub Bot commented on FLINK-9141: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5836#discussion_r185942219 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import org.junit.Test; + +import java.util.Collections; + +/** + * Tests that verify correct behavior when applying split/getSideOutput operations on one {@link DataStream}. + */ +public class SplitSideOutputTest { + + private static final OutputTag outputTag = new OutputTag("outputTag") {}; + + @Test + public void testSideOutputAfterSelectIsForbidden() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SingleOutputStreamOperator processInput = env.fromElements("foo") + .process(new DummyProcessFunction()); + + processInput.split(Collections::singleton); + + try { + processInput.getSideOutput(outputTag); + } catch (UnsupportedOperationException expected){ + // expected + } + } + + @Test + public void testSelectAfterSideOutputIsForbidden() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SingleOutputStreamOperator processInput = env.fromElements("foo") + .process(new DummyProcessFunction()); + + processInput.getSideOutput(outputTag); + + try { + processInput.split(Collections::singleton); --- End diff -- same as above > Calling getSideOutput() and split() on one DataStream causes NPE > > > Key: FLINK-9141 > URL: https://issues.apache.org/jira/browse/FLINK-9141 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > > Calling both {{getSideOutput()}} and {{split()}} on one DataStream causes a > {{NullPointerException}} to be thrown at runtime. > As a work-around one can add a no-op map function before the split() call. > Exception: > {code} > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.collector.selector.DirectedOutput.(DirectedOutput.java:79) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:326) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:128) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:274) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > {code} > Reproducer: > {code} > private static final OutputTag tag = new OutputTag("tag") {}; > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream dataStream1 = env.fromElements("foo"); > SingleOutputStreamOperator processedStream = dataStream1 > .process(new ProcessFunction() { > @Override >
[jira] [Commented] (FLINK-9141) Calling getSideOutput() and split() on one DataStream causes NPE
[ https://issues.apache.org/jira/browse/FLINK-9141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463092#comment-16463092 ] ASF GitHub Bot commented on FLINK-9141: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5836#discussion_r185942147 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import org.junit.Test; + +import java.util.Collections; + +/** + * Tests that verify correct behavior when applying split/getSideOutput operations on one {@link DataStream}. + */ +public class SplitSideOutputTest { + + private static final OutputTag outputTag = new OutputTag("outputTag") {}; + + @Test + public void testSideOutputAfterSelectIsForbidden() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SingleOutputStreamOperator processInput = env.fromElements("foo") + .process(new DummyProcessFunction()); + + processInput.split(Collections::singleton); + + try { + processInput.getSideOutput(outputTag); --- End diff -- add `Assert.fail();` after `processInput.getSideOutput(outputTag);` to ensure that the test fails if no exception is thrown. > Calling getSideOutput() and split() on one DataStream causes NPE > > > Key: FLINK-9141 > URL: https://issues.apache.org/jira/browse/FLINK-9141 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > > Calling both {{getSideOutput()}} and {{split()}} on one DataStream causes a > {{NullPointerException}} to be thrown at runtime. > As a work-around one can add a no-op map function before the split() call. > Exception: > {code} > Caused by: java.lang.NullPointerException > at > org.apache.flink.streaming.api.collector.selector.DirectedOutput.(DirectedOutput.java:79) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:326) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:128) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:274) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > {code} > Reproducer: > {code} > private static final OutputTag tag = new OutputTag("tag") {}; > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream dataStream1 = env.fromElements("foo"); > SingleOutputStreamOperator processedStream = dataStream1 > .process(new ProcessFunction() { > @Override > public void processElement(String value, Context ctx, > Collector out) { > } > }); > processedStream.getSideOutput(tag) > .print(); > processedStream > .split(Collections::singletonList) > .select("bar") > .print(); > env.execute(); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5836: [FLINK-9141][datastream] Fail early when using bot...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5836#discussion_r185942219 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import org.junit.Test; + +import java.util.Collections; + +/** + * Tests that verify correct behavior when applying split/getSideOutput operations on one {@link DataStream}. + */ +public class SplitSideOutputTest { + + private static final OutputTag outputTag = new OutputTag("outputTag") {}; + + @Test + public void testSideOutputAfterSelectIsForbidden() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SingleOutputStreamOperator processInput = env.fromElements("foo") + .process(new DummyProcessFunction()); + + processInput.split(Collections::singleton); + + try { + processInput.getSideOutput(outputTag); + } catch (UnsupportedOperationException expected){ + // expected + } + } + + @Test + public void testSelectAfterSideOutputIsForbidden() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SingleOutputStreamOperator processInput = env.fromElements("foo") + .process(new DummyProcessFunction()); + + processInput.getSideOutput(outputTag); + + try { + processInput.split(Collections::singleton); --- End diff -- same as above ---
[GitHub] flink pull request #5836: [FLINK-9141][datastream] Fail early when using bot...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5836#discussion_r185942147 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import org.junit.Test; + +import java.util.Collections; + +/** + * Tests that verify correct behavior when applying split/getSideOutput operations on one {@link DataStream}. + */ +public class SplitSideOutputTest { + + private static final OutputTag outputTag = new OutputTag("outputTag") {}; + + @Test + public void testSideOutputAfterSelectIsForbidden() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SingleOutputStreamOperator processInput = env.fromElements("foo") + .process(new DummyProcessFunction()); + + processInput.split(Collections::singleton); + + try { + processInput.getSideOutput(outputTag); --- End diff -- add `Assert.fail();` after `processInput.getSideOutput(outputTag);` to ensure that the test fails if no exception is thrown. ---
[jira] [Comment Edited] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase
[ https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307281#comment-16307281 ] Ted Yu edited comment on FLINK-6105 at 5/3/18 9:24 PM: --- In flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java : {code} try { Thread.sleep(500); } catch (InterruptedException e1) { // ignore it } {code} Interrupt status should be restored, or throw InterruptedIOException . was (Author: yuzhih...@gmail.com): In flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java : {code} try { Thread.sleep(500); } catch (InterruptedException e1) { // ignore it } {code} Interrupt status should be restored, or throw InterruptedIOException . > Properly handle InterruptedException in HadoopInputFormatBase > - > > Key: FLINK-6105 > URL: https://issues.apache.org/jira/browse/FLINK-6105 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Major > > When catching InterruptedException, we should throw InterruptedIOException > instead of IOException. > The following example is from HadoopInputFormatBase : > {code} > try { > splits = this.mapreduceInputFormat.getSplits(jobContext); > } catch (InterruptedException e) { > throw new IOException("Could not get Splits.", e); > } > {code} > There may be other places where IOE is thrown. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8726) Code highlighting partially broken
[ https://issues.apache.org/jira/browse/FLINK-8726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463056#comment-16463056 ] ASF GitHub Bot commented on FLINK-8726: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5909 > Code highlighting partially broken > -- > > Key: FLINK-8726 > URL: https://issues.apache.org/jira/browse/FLINK-8726 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > With the recent changes around the documentation build dependencies code > highlighting is no longer fully working. > Sections as below are rendered without any background [like > here|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html]. > {code} > ~~~bash > # get the hadoop2 package from the Flink download page at > # {{ site.download_url }} > curl -O > tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz > cd flink-{{ site.version }}/ > ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 > ./examples/batch/WordCount.jar > ~~~ > {code} > Sections using the {{\{% highlight java %}}} syntax are still working. > We may have to do a sweep over the docs and port all code sections to the > working syntax. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5909: [FLINK-8726][docs] Fix and normalize code-highligh...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5909 ---
[jira] [Closed] (FLINK-8726) Code highlighting partially broken
[ https://issues.apache.org/jira/browse/FLINK-8726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-8726. --- Resolution: Fixed master: b13c70b60b04e0d51604d0e37868612f77e86299 1.5: 6d0775176af206edb86d8ac9cfff35654208e1e9 > Code highlighting partially broken > -- > > Key: FLINK-8726 > URL: https://issues.apache.org/jira/browse/FLINK-8726 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > With the recent changes around the documentation build dependencies code > highlighting is no longer fully working. > Sections as below are rendered without any background [like > here|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html]. > {code} > ~~~bash > # get the hadoop2 package from the Flink download page at > # {{ site.download_url }} > curl -O > tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz > cd flink-{{ site.version }}/ > ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 > ./examples/batch/WordCount.jar > ~~~ > {code} > Sections using the {{\{% highlight java %}}} syntax are still working. > We may have to do a sweep over the docs and port all code sections to the > working syntax. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5909: [FLINK-8726][docs] Fix and normalize code-highlighting
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5909 merging. ---
[jira] [Commented] (FLINK-8726) Code highlighting partially broken
[ https://issues.apache.org/jira/browse/FLINK-8726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463052#comment-16463052 ] ASF GitHub Bot commented on FLINK-8726: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5909 merging. > Code highlighting partially broken > -- > > Key: FLINK-8726 > URL: https://issues.apache.org/jira/browse/FLINK-8726 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > With the recent changes around the documentation build dependencies code > highlighting is no longer fully working. > Sections as below are rendered without any background [like > here|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html]. > {code} > ~~~bash > # get the hadoop2 package from the Flink download page at > # {{ site.download_url }} > curl -O > tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz > cd flink-{{ site.version }}/ > ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 > ./examples/batch/WordCount.jar > ~~~ > {code} > Sections using the {{\{% highlight java %}}} syntax are still working. > We may have to do a sweep over the docs and port all code sections to the > working syntax. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null
[ https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463037#comment-16463037 ] ASF GitHub Bot commented on FLINK-8237: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5927 Thanks for the PR @pavel-shvetsov-git! I've left a suggestion to improve the error message. Afterwards the PR should be good to merge. > BucketingSink throws NPE when Writer.duplicate returns null > --- > > Key: FLINK-8237 > URL: https://issues.apache.org/jira/browse/FLINK-8237 > Project: Flink > Issue Type: Bug >Reporter: Gábor Hermann >Priority: Minor > > Users need to look into Flink code to find the cause. We could catch that > null before even running the job. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5927: [FLINK-8237] [BucketingSink] Better error message added
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5927 Thanks for the PR @pavel-shvetsov-git! I've left a suggestion to improve the error message. Afterwards the PR should be good to merge. ---
[jira] [Commented] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null
[ https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463035#comment-16463035 ] ASF GitHub Bot commented on FLINK-8237: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5927#discussion_r185932952 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -550,6 +550,9 @@ private void openNewPartFile(Path bucketPath, BucketState bucketState) throws Path inProgressPath = getInProgressPathFor(partPath); if (bucketState.writer == null) { bucketState.writer = writerTemplate.duplicate(); + if (bucketState.writer == null) { + throw new RuntimeException("Could not duplicate writer."); --- End diff -- I would add the class name of the `writerTemplate` object and that the class needs to implement the `Writer.duplicate()` method. > BucketingSink throws NPE when Writer.duplicate returns null > --- > > Key: FLINK-8237 > URL: https://issues.apache.org/jira/browse/FLINK-8237 > Project: Flink > Issue Type: Bug >Reporter: Gábor Hermann >Priority: Minor > > Users need to look into Flink code to find the cause. We could catch that > null before even running the job. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5927: [FLINK-8237] [BucketingSink] Better error message ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5927#discussion_r185932952 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -550,6 +550,9 @@ private void openNewPartFile(Path bucketPath, BucketState bucketState) throws Path inProgressPath = getInProgressPathFor(partPath); if (bucketState.writer == null) { bucketState.writer = writerTemplate.duplicate(); + if (bucketState.writer == null) { + throw new RuntimeException("Could not duplicate writer."); --- End diff -- I would add the class name of the `writerTemplate` object and that the class needs to implement the `Writer.duplicate()` method. ---
[jira] [Commented] (FLINK-8726) Code highlighting partially broken
[ https://issues.apache.org/jira/browse/FLINK-8726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463025#comment-16463025 ] ASF GitHub Bot commented on FLINK-8726: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5909 Thanks for cleaning up the syntax highlighting @zentol! +1 to merge > Code highlighting partially broken > -- > > Key: FLINK-8726 > URL: https://issues.apache.org/jira/browse/FLINK-8726 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.5.0 > > > With the recent changes around the documentation build dependencies code > highlighting is no longer fully working. > Sections as below are rendered without any background [like > here|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html]. > {code} > ~~~bash > # get the hadoop2 package from the Flink download page at > # {{ site.download_url }} > curl -O > tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz > cd flink-{{ site.version }}/ > ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 > ./examples/batch/WordCount.jar > ~~~ > {code} > Sections using the {{\{% highlight java %}}} syntax are still working. > We may have to do a sweep over the docs and port all code sections to the > working syntax. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5909: [FLINK-8726][docs] Fix and normalize code-highlighting
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5909 Thanks for cleaning up the syntax highlighting @zentol! +1 to merge ---
[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade
[ https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462758#comment-16462758 ] ASF GitHub Bot commented on FLINK-8978: --- Github user azagrebin commented on the issue: https://github.com/apache/flink/pull/5947 agree, I added constant check of previous non-null state into its update method > End-to-end test: Job upgrade > > > Key: FLINK-8978 > URL: https://issues.apache.org/jira/browse/FLINK-8978 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Andrey Zagrebin >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > Job upgrades usually happen during the lifetime of a real world Flink job. > Therefore, we should add an end-to-end test which exactly covers this > scenario. I suggest to do the follwoing: > # run the general purpose testing job FLINK-8971 > # take a savepoint > # Modify the job by introducing a new operator and changing the order of > others > # Resume the modified job from the savepoint > # Verify that everything went correctly -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5947: [FLINK-8978] Stateful generic stream job upgrade e2e test
Github user azagrebin commented on the issue: https://github.com/apache/flink/pull/5947 agree, I added constant check of previous non-null state into its update method ---
[jira] [Updated] (FLINK-9295) FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode
[ https://issues.apache.org/jira/browse/FLINK-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christopher Ng updated FLINK-9295: -- Description: {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple sinks are used within the same sub-task. This can happen when operator-chaining results in two different sinks in the same topology being chained into a task, and thus into each of its sub-tasks. The problem is that {{TransactionIdsGenerator}} only takes into account the task name, the subtask index, the number of subtasks, and a couple of other things. All the attributes are the same between different {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same transaction ids and one of them ends up failing. was: {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple sinks are used within the same sub-task. This can happen when operator-chaining results in two different sinks in the same topology being chained into a single sub-task. The problem is that {{TransactionIdsGenerator}} only takes into account the task name, the subtask index, the number of subtasks, and a couple of other things. All the attributes are the same between different {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same transaction ids and one of them ends up failing. > FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in > EXACTLY_ONCE mode > - > > Key: FLINK-9295 > URL: https://issues.apache.org/jira/browse/FLINK-9295 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.2 >Reporter: Christopher Ng >Priority: Major > > {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple > sinks are used within the same sub-task. This can happen when > operator-chaining results in two different sinks in the same topology being > chained into a task, and thus into each of its sub-tasks. > The problem is that {{TransactionIdsGenerator}} only takes into account the > task name, the subtask index, the number of subtasks, and a couple of other > things. All the attributes are the same between different > {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same > transaction ids and one of them ends up failing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9295) FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode
[ https://issues.apache.org/jira/browse/FLINK-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christopher Ng updated FLINK-9295: -- Description: {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple sinks are used within the same sub-task. This can happen when operator-chaining results in two different sinks in the same topology being chained into a single sub-task. The problem is that {{TransactionIdsGenerator}} only takes into account the task name, the subtask index, the number of subtasks, and a couple of other things. All the attributes are the same between different {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same transaction ids and one of them ends up failing. was: {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple sinks are used within the same sub-task. This can happen when chaining results in two different sinks in the same topology being chained into a single sub-task. The problem is that {{TransactionIdsGenerator}} only takes into account the task name, the subtask index, the number of subtasks, and a couple of other things. All the attributes are the same between different {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same transaction ids and one of them ends up failing. > FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in > EXACTLY_ONCE mode > - > > Key: FLINK-9295 > URL: https://issues.apache.org/jira/browse/FLINK-9295 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.2 >Reporter: Christopher Ng >Priority: Major > > {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple > sinks are used within the same sub-task. This can happen when > operator-chaining results in two different sinks in the same topology being > chained into a single sub-task. > The problem is that {{TransactionIdsGenerator}} only takes into account the > task name, the subtask index, the number of subtasks, and a couple of other > things. All the attributes are the same between different > {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same > transaction ids and one of them ends up failing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9295) FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode
Christopher Ng created FLINK-9295: - Summary: FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode Key: FLINK-9295 URL: https://issues.apache.org/jira/browse/FLINK-9295 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.4.2 Reporter: Christopher Ng {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple sinks are used within the same sub-task. This can happen when chaining results in two different sinks in the same topology being chained into a single sub-task. The problem is that {{TransactionIdsGenerator}} only takes into account the task name, the subtask index, the number of subtasks, and a couple of other things. All the attributes are the same between different {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same transaction ids and one of them ends up failing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id
[ https://issues.apache.org/jira/browse/FLINK-9293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462665#comment-16462665 ] ASF GitHub Bot commented on FLINK-9293: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5951 Good catch! Thank you for the PR. Will try to review this asap... > SlotPool should check slot id when accepting a slot offer with existing > allocation id > - > > Key: FLINK-9293 > URL: https://issues.apache.org/jira/browse/FLINK-9293 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > For flip-6, there may be two or more slot assigned to the same slot > allocation. For example, taskExecutor1 register, and assign allocationID1 to > its slot1, but from taskExecutor1 side, the registeration timeout, and it > register again, RM will fail the allocationID1 and assign slot2 on > taskExecutor2 to it. but taskExecutor1 has already accept the allocationID1. > So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the > allocationID1. Now slot pool just accept all the slot offer, and this may one > slot leak. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5951: [FLINK-9293] [runtime] SlotPool should check slot id when...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5951 Good catch! Thank you for the PR. Will try to review this asap... ---
[jira] [Commented] (FLINK-9235) Add Integration test for Flink-Yarn-Kerberos integration for flip-6
[ https://issues.apache.org/jira/browse/FLINK-9235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462643#comment-16462643 ] ASF GitHub Bot commented on FLINK-9235: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5953 R: @zentol @suez1224 At least the secured ITCase is currently failing for legacy mode. Investigating... > Add Integration test for Flink-Yarn-Kerberos integration for flip-6 > --- > > Key: FLINK-9235 > URL: https://issues.apache.org/jira/browse/FLINK-9235 > Project: Flink > Issue Type: Test >Affects Versions: 1.5.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > We need to provide an integration test for flip-6 similar to > YARNSessionFIFOSecuredITCase for the legacy deployment mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9235) Add Integration test for Flink-Yarn-Kerberos integration for flip-6
[ https://issues.apache.org/jira/browse/FLINK-9235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462641#comment-16462641 ] ASF GitHub Bot commented on FLINK-9235: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5901 @suez1224 I did an alternative version in #5953. All YARN tests currently have the problem that they are only executed with the "new" (FLIP-6) mode or legacy mode (only the secured IT Case). In my PR I change that to use the legacy flag that we specify on Travis. Meaning that all YARN tests now run for both configurations. > Add Integration test for Flink-Yarn-Kerberos integration for flip-6 > --- > > Key: FLINK-9235 > URL: https://issues.apache.org/jira/browse/FLINK-9235 > Project: Flink > Issue Type: Test >Affects Versions: 1.5.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > We need to provide an integration test for flip-6 similar to > YARNSessionFIFOSecuredITCase for the legacy deployment mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5953: [FLINK-9235] Add Integration test for Flink-Yarn-Kerberos...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5953 R: @zentol @suez1224 At least the secured ITCase is currently failing for legacy mode. Investigating... ---
[GitHub] flink issue #5901: [FLINK-9235][Security] Add integration tests for YARN ker...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5901 @suez1224 I did an alternative version in #5953. All YARN tests currently have the problem that they are only executed with the "new" (FLIP-6) mode or legacy mode (only the secured IT Case). In my PR I change that to use the legacy flag that we specify on Travis. Meaning that all YARN tests now run for both configurations. ---
[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9294: - Description: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature. was: Most of the UDF function signatures that includes composite types such as {quote}MAP{quote}, {quote}ARRAY{quote}, etc would require user to override {code:java}getParameterType{code} or {code:java}getResultType{code} method explicitly. It should be able to resolve the composite type based on the function signature. > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > *{{MAP}}*, *{{ARRAY}}*, etc would require user to override > *{{getParameterType}}* or *{{getResultType}}* method explicitly. > It should be able to resolve the composite type based on the function > signature. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-9294: Assignee: Rong Rong > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > *{{MAP}}*, *{{ARRAY}}*, etc would require user to override > *{{getParameterType}}* or *{{getResultType}}* method explicitly. > It should be able to resolve the composite type based on the function > signature. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9294: - Description: Most of the UDF function signatures that includes composite types such as {quote}MAP{quote}, {quote}ARRAY{quote}, etc would require user to override {code:java}getParameterType{code} or {code:java}getResultType{code} method explicitly. It should be able to resolve the composite type based on the function signature. was: Most of the UDF function signatures that includes composite types such as {code:java}MAP{code}, {code:java}ARRAY{code}, etc would require user to override {code:java}getParameterType{code} or {code:java}getResultType{code} method explicitly. It should be able to resolve the composite type based on the function signature. > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > {quote}MAP{quote}, {quote}ARRAY{quote}, etc would require user to override > {code:java}getParameterType{code} or {code:java}getResultType{code} method > explicitly. > It should be able to resolve the composite type based on the function > signature. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9294: - Description: Most of the UDF function signatures that includes composite types such as {code:java}MAP{code}, {code:java}ARRAY{code}, etc would require user to override {code:java}getParameterType{code} or {code:java}getResultType{code} method explicitly. It should be able to resolve the composite type based on the function signature. was: Most of the UDF function signatures that includes composite types such as {code}MAP{/code} {{ARRAY}}, etc would require user to override {{getParameterType}} or {{getResultType}} method explicitly. It should be able to resolve the composite type based on the function signature. > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > {code:java}MAP{code}, {code:java}ARRAY{code}, etc would require user to > override {code:java}getParameterType{code} or {code:java}getResultType{code} > method explicitly. > It should be able to resolve the composite type based on the function > signature. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9294: - Description: Most of the UDF function signatures that includes composite types such as {code}MAP{/code} {{ARRAY}}, etc would require user to override {{getParameterType}} or {{getResultType}} method explicitly. It should be able to resolve the composite type based on the function signature. was: For now most of the UDF function signatures that includes composite types such as {{MAP}} {{ARRAY}}, etc would require user to override {{getParameterType}} or {{getResultType}} method explicitly. It should be able to resolve the composite type based on the function signature. > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > {code}MAP{/code} {{ARRAY}}, etc would require user to override > {{getParameterType}} or {{getResultType}} method explicitly. > It should be able to resolve the composite type based on the function > signature. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9294: - Description: For now most of the UDF function signatures that includes composite types such as {{MAP}} {{ARRAY}}, etc would require user to override {{getParameterType}} or {{getResultType}} method explicitly. It should be able to resolve the composite type based on the function signature. was: For now most of the UDF function signatures that includes composite types such as `MAP` `ARRAY`, etc would require user to override `getParameterType` or `getResultType` method explicitly. It should be able to resolve the composite type based on the function signature. > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Rong Rong >Priority: Major > > For now most of the UDF function signatures that includes composite types > such as {{MAP}} {{ARRAY}}, etc would require user to override > {{getParameterType}} or {{getResultType}} method explicitly. > It should be able to resolve the composite type based on the function > signature. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
Rong Rong created FLINK-9294: Summary: Improve type inference for UDFs with composite parameter or result type Key: FLINK-9294 URL: https://issues.apache.org/jira/browse/FLINK-9294 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Rong Rong For now most of the UDF function signatures that includes composite types such as `MAP` `ARRAY`, etc would require user to override `getParameterType` or `getResultType` method explicitly. It should be able to resolve the composite type based on the function signature. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9235) Add Integration test for Flink-Yarn-Kerberos integration for flip-6
[ https://issues.apache.org/jira/browse/FLINK-9235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462615#comment-16462615 ] ASF GitHub Bot commented on FLINK-9235: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5953 [FLINK-9235] Add Integration test for Flink-Yarn-Kerberos integration for flip-6 Alternative version of #5901 You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-9235-flip-6-yarn-secured-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5953.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5953 commit 9ca7852fe1a6f1a86d05ba2f7851199e43411579 Author: Aljoscha KrettekDate: 2018-05-03T14:23:29Z Remove special-case krb5.conf code from YARN runners commit b4fb889c911ccef0521cd9ed8839190a12f2d4a5 Author: Aljoscha Krettek Date: 2018-05-03T14:27:40Z [FLINK-9235] Test new FLIP-6 code in YARNSessionFIFOSecuredITCase Before, always setting mode to LEGACY_MODE when security settings are present caused the test never to run with the new code. For this, we also need to actually execute an example. Otherwise, no TaskExecutors would be brought up. > Add Integration test for Flink-Yarn-Kerberos integration for flip-6 > --- > > Key: FLINK-9235 > URL: https://issues.apache.org/jira/browse/FLINK-9235 > Project: Flink > Issue Type: Test >Affects Versions: 1.5.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > We need to provide an integration test for flip-6 similar to > YARNSessionFIFOSecuredITCase for the legacy deployment mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5953: [FLINK-9235] Add Integration test for Flink-Yarn-K...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5953 [FLINK-9235] Add Integration test for Flink-Yarn-Kerberos integration for flip-6 Alternative version of #5901 You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-9235-flip-6-yarn-secured-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5953.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5953 commit 9ca7852fe1a6f1a86d05ba2f7851199e43411579 Author: Aljoscha KrettekDate: 2018-05-03T14:23:29Z Remove special-case krb5.conf code from YARN runners commit b4fb889c911ccef0521cd9ed8839190a12f2d4a5 Author: Aljoscha Krettek Date: 2018-05-03T14:27:40Z [FLINK-9235] Test new FLIP-6 code in YARNSessionFIFOSecuredITCase Before, always setting mode to LEGACY_MODE when security settings are present caused the test never to run with the new code. For this, we also need to actually execute an example. Otherwise, no TaskExecutors would be brought up. ---
[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade
[ https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462608#comment-16462608 ] ASF GitHub Bot commented on FLINK-8978: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5947 Please also double check, it seems there are files without license header. > End-to-end test: Job upgrade > > > Key: FLINK-8978 > URL: https://issues.apache.org/jira/browse/FLINK-8978 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Andrey Zagrebin >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > Job upgrades usually happen during the lifetime of a real world Flink job. > Therefore, we should add an end-to-end test which exactly covers this > scenario. I suggest to do the follwoing: > # run the general purpose testing job FLINK-8971 > # take a savepoint > # Modify the job by introducing a new operator and changing the order of > others > # Resume the modified job from the savepoint > # Verify that everything went correctly -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5947: [FLINK-8978] Stateful generic stream job upgrade e2e test
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5947 Please also double check, it seems there are files without license header. ---
[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade
[ https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462591#comment-16462591 ] ASF GitHub Bot commented on FLINK-8978: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5947#discussion_r185840683 --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java --- @@ -33,21 +33,28 @@ private static final long serialVersionUID = -1205814329756790916L; private transient ValueState valueState; + private transient boolean afterRestoration; private final TypeSerializer typeSerializer; private final JoinFunctionstateValueGenerator; + private final RestoredStateVerifier restoredStateVerifier; public ArtificialValueStateBuilder( String stateName, JoinFunction stateValueGenerator, - TypeSerializer typeSerializer) { - + TypeSerializer typeSerializer, + RestoredStateVerifier restoredStateVerifier) { super(stateName); this.typeSerializer = typeSerializer; this.stateValueGenerator = stateValueGenerator; + this.restoredStateVerifier = restoredStateVerifier; } @Override public void artificialStateForElement(IN event) throws Exception { + if (afterRestoration) { --- End diff -- As this is a test job, I think it might not hurt to just check every element after a restore. > End-to-end test: Job upgrade > > > Key: FLINK-8978 > URL: https://issues.apache.org/jira/browse/FLINK-8978 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Andrey Zagrebin >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > Job upgrades usually happen during the lifetime of a real world Flink job. > Therefore, we should add an end-to-end test which exactly covers this > scenario. I suggest to do the follwoing: > # run the general purpose testing job FLINK-8971 > # take a savepoint > # Modify the job by introducing a new operator and changing the order of > others > # Resume the modified job from the savepoint > # Verify that everything went correctly -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5947: [FLINK-8978] Stateful generic stream job upgrade e...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5947#discussion_r185840683 --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java --- @@ -33,21 +33,28 @@ private static final long serialVersionUID = -1205814329756790916L; private transient ValueState valueState; + private transient boolean afterRestoration; private final TypeSerializer typeSerializer; private final JoinFunctionstateValueGenerator; + private final RestoredStateVerifier restoredStateVerifier; public ArtificialValueStateBuilder( String stateName, JoinFunction stateValueGenerator, - TypeSerializer typeSerializer) { - + TypeSerializer typeSerializer, + RestoredStateVerifier restoredStateVerifier) { super(stateName); this.typeSerializer = typeSerializer; this.stateValueGenerator = stateValueGenerator; + this.restoredStateVerifier = restoredStateVerifier; } @Override public void artificialStateForElement(IN event) throws Exception { + if (afterRestoration) { --- End diff -- As this is a test job, I think it might not hurt to just check every element after a restore. ---
[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade
[ https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462589#comment-16462589 ] ASF GitHub Bot commented on FLINK-8978: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5947#discussion_r185840213 --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java --- @@ -33,21 +33,28 @@ private static final long serialVersionUID = -1205814329756790916L; private transient ValueState valueState; + private transient boolean afterRestoration; private final TypeSerializer typeSerializer; private final JoinFunctionstateValueGenerator; + private final RestoredStateVerifier restoredStateVerifier; public ArtificialValueStateBuilder( String stateName, JoinFunction stateValueGenerator, - TypeSerializer typeSerializer) { - + TypeSerializer typeSerializer, + RestoredStateVerifier restoredStateVerifier) { super(stateName); this.typeSerializer = typeSerializer; this.stateValueGenerator = stateValueGenerator; + this.restoredStateVerifier = restoredStateVerifier; } @Override public void artificialStateForElement(IN event) throws Exception { + if (afterRestoration) { --- End diff -- I find this way of checking the state rather invasive not completely thorough. There is now a pretty tight coupling between creating artificial state and checking something about it on restore. In particular, there is a hardcoded way now when to check. This makes it harder to reuse the classes in further test jobs that we might want to build with them. Can't we use a way that is more based on composition? For example, wrap the state builder in a state checker? This is also only doing just one check, so if the input element has a key that we never encountered, the state is `null` and there might be no check. > End-to-end test: Job upgrade > > > Key: FLINK-8978 > URL: https://issues.apache.org/jira/browse/FLINK-8978 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Andrey Zagrebin >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > Job upgrades usually happen during the lifetime of a real world Flink job. > Therefore, we should add an end-to-end test which exactly covers this > scenario. I suggest to do the follwoing: > # run the general purpose testing job FLINK-8971 > # take a savepoint > # Modify the job by introducing a new operator and changing the order of > others > # Resume the modified job from the savepoint > # Verify that everything went correctly -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5947: [FLINK-8978] Stateful generic stream job upgrade e...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5947#discussion_r185840213 --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java --- @@ -33,21 +33,28 @@ private static final long serialVersionUID = -1205814329756790916L; private transient ValueState valueState; + private transient boolean afterRestoration; private final TypeSerializer typeSerializer; private final JoinFunctionstateValueGenerator; + private final RestoredStateVerifier restoredStateVerifier; public ArtificialValueStateBuilder( String stateName, JoinFunction stateValueGenerator, - TypeSerializer typeSerializer) { - + TypeSerializer typeSerializer, + RestoredStateVerifier restoredStateVerifier) { super(stateName); this.typeSerializer = typeSerializer; this.stateValueGenerator = stateValueGenerator; + this.restoredStateVerifier = restoredStateVerifier; } @Override public void artificialStateForElement(IN event) throws Exception { + if (afterRestoration) { --- End diff -- I find this way of checking the state rather invasive not completely thorough. There is now a pretty tight coupling between creating artificial state and checking something about it on restore. In particular, there is a hardcoded way now when to check. This makes it harder to reuse the classes in further test jobs that we might want to build with them. Can't we use a way that is more based on composition? For example, wrap the state builder in a state checker? This is also only doing just one check, so if the input element has a key that we never encountered, the state is `null` and there might be no check. ---
[jira] [Commented] (FLINK-9245) Can't create a BucketingSink with a provided Configuration if no hadoop defaults
[ https://issues.apache.org/jira/browse/FLINK-9245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462583#comment-16462583 ] Aljoscha Krettek commented on FLINK-9245: - I think the problem is that there really is no authority in the path you specify, and there is no default authority. The part between the second and third {{/}} is where the authority should be, mostly a hostname. For example {{hdfs://my-namenode:50030/user/$USER)}/application_name/}} or {{hdfs://localhost:50030/user/$USER)}/application_name/}}. Does that help? > Can't create a BucketingSink with a provided Configuration if no hadoop > defaults > > > Key: FLINK-9245 > URL: https://issues.apache.org/jira/browse/FLINK-9245 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.4.2 >Reporter: Julien Cuquemelle >Priority: Minor > Fix For: 1.6.0 > > > We build Integration tests using this kind of code: > {code:java} > val bucketingSink = new > BucketingSink[Row]("hdfs:///user/$USER)}/application_name/") > bucketingSink.setFSConfig(hadoopRule.getConfiguration.hdfs) > bucketingSink.setBucketer(new DateTimeBucketer[Row]("-MM-dd--HHmm")) > outpuStream.addSink(bucketingSink) > {code} > Here, the hadoopRule is providing a valid hdfs config that should allows this > kind of code to run on a machine with no HADOOP_HOME or HADOOP_CONF_DIR set > up, like a developper workstation or a Jenkins slave. > When running this code on such a machine, the .createHadoopFileSystem(...) > fails with > {noformat} > The given file system URI (hdfs:///user/$USER/application_name/) did not > describe the authority > at > org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:149) > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401){noformat} > because it tries to instantiate the fileSystem from a default configuration > in .getUnguardedFileSystem() ; as the default conf doesn't exist, the default > filesystem resolves to "file:///" and the checks of the consistency of the > URI fails because no authority can be found So the whole filesystem creation > fails before actually trying to use the provided config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id
[ https://issues.apache.org/jira/browse/FLINK-9293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-9293: Fix Version/s: 1.5.0 > SlotPool should check slot id when accepting a slot offer with existing > allocation id > - > > Key: FLINK-9293 > URL: https://issues.apache.org/jira/browse/FLINK-9293 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > For flip-6, there may be two or more slot assigned to the same slot > allocation. For example, taskExecutor1 register, and assign allocationID1 to > its slot1, but from taskExecutor1 side, the registeration timeout, and it > register again, RM will fail the allocationID1 and assign slot2 on > taskExecutor2 to it. but taskExecutor1 has already accept the allocationID1. > So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the > allocationID1. Now slot pool just accept all the slot offer, and this may one > slot leak. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462563#comment-16462563 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5950 Please also check the travis build, some related tests seem to fail: Tests in error: TypeSerializerSerializationUtilTest.testSerializerAndConfigPairsSerializationWithSerializerDeserializationFailures:236 » IO TypeSerializerSerializationUtilTest.testSerializerSerializationWithClassNotFound:109 » IO TypeSerializerSerializationUtilTest.testSerializerSerializationWithInvalidClass:149 » IO PojoSerializerTest.testSerializerSerializationFailureResilience:570 » IO > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5950 Please also check the travis build, some related tests seem to fail: Tests in error: TypeSerializerSerializationUtilTest.testSerializerAndConfigPairsSerializationWithSerializerDeserializationFailures:236 û IO TypeSerializerSerializationUtilTest.testSerializerSerializationWithClassNotFound:109 û IO TypeSerializerSerializationUtilTest.testSerializerSerializationWithInvalidClass:149 û IO PojoSerializerTest.testSerializerSerializationFailureResilience:570 û IO ---
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462554#comment-16462554 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185828137 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience( for (int i = 0; i < numSerializersAndConfigSnapshots; i++) { bufferWithPos.setPosition(offsets[i * 2]); - serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader); + serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true); --- End diff -- The problem is that this method might mix too many things together, that is also again visible in the complex return type and e.g. many call sites are only interested in the first element of the list. Wonder if we should break this up in dedicated steps (serializer, config) and let the callers invoke them one by one, so that we can handle exceptions on a higher level and make decisions about if we need to have a serializer there. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185828137 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience( for (int i = 0; i < numSerializersAndConfigSnapshots; i++) { bufferWithPos.setPosition(offsets[i * 2]); - serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader); + serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true); --- End diff -- The problem is that this method might mix too many things together, that is also again visible in the complex return type and e.g. many call sites are only interested in the first element of the list. Wonder if we should break this up in dedicated steps (serializer, config) and let the callers invoke them one by one, so that we can handle exceptions on a higher level and make decisions about if we need to have a serializer there. ---
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462547#comment-16462547 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185825767 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience( for (int i = 0; i < numSerializersAndConfigSnapshots; i++) { bufferWithPos.setPosition(offsets[i * 2]); - serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader); + serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true); --- End diff -- This could be the place where we catch a `UnloadableSerializerException`, but if we let the caller do the iteration from 0 to `numSerializersAndConfigSnapshots`, we can push it out even more. Why is it helpful to create a list here? Otherwise we can do the exception handling in the caller and more fine grained. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185825767 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience( for (int i = 0; i < numSerializersAndConfigSnapshots; i++) { bufferWithPos.setPosition(offsets[i * 2]); - serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader); + serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true); --- End diff -- This could be the place where we catch a `UnloadableSerializerException`, but if we let the caller do the iteration from 0 to `numSerializersAndConfigSnapshots`, we can push it out even more. Why is it helpful to create a list here? Otherwise we can do the exception handling in the caller and more fine grained. ---
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462534#comment-16462534 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185821296 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -373,15 +370,14 @@ public void read(DataInputView in) throws IOException { Thread.currentThread().setContextClassLoader(userClassLoader); typeSerializer = (TypeSerializer) ois.readObject(); - } catch (ClassNotFoundException | InvalidClassException e) { + } catch (Exception e) { if (useDummyPlaceholder) { // we create a dummy so that all the information is not lost when we get a new checkpoint before receiving // a proper typeserializer from the user - typeSerializer = - new UnloadableDummyTypeSerializer<>(buffer); - LOG.warn("Could not find requested TypeSerializer class in classpath. Created dummy.", e); + typeSerializer = new UnloadableDummyTypeSerializer<>(buffer); --- End diff -- Some food for thought, even if it is not introduced by this PR: why can we not introduce a special `UnloadableSerializerException extends IOException` that holds a field with the byte array in `buffer` and let it bubble up to a higher level component. If that component wants to introduce dummies, it can do some from the bytes in the caught exception, if not forward the exception. Then we would not have to hand down this flag but let the higher level component decide. What do you think? > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the
[GitHub] flink pull request #5923: [FLINK-9253][network] make the maximum floating bu...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5923#discussion_r185817506 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -228,20 +228,19 @@ public void setupPartition(ResultPartition partition) throws IOException { @VisibleForTesting public void setupInputGate(SingleInputGate gate) throws IOException { BufferPool bufferPool = null; - int maxNumberOfMemorySegments; try { if (enableCreditBased) { - maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - extraNetworkBuffersPerGate : Integer.MAX_VALUE; - // assign exclusive buffers to input channels directly and use the rest for floating buffers - gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); - bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments); + int nrExclusiveMemorySegments = gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); --- End diff -- Please, no abbreviations like `nrExclusiveMemorySegments`. `assignedExclusiveMemorySegments`? `exclusiveMemorySegments`? ---
[jira] [Commented] (FLINK-9253) Make buffer count per InputGate always #channels*buffersPerChannel + ExclusiveBuffers
[ https://issues.apache.org/jira/browse/FLINK-9253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462535#comment-16462535 ] ASF GitHub Bot commented on FLINK-9253: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5923#discussion_r185819830 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -228,20 +228,19 @@ public void setupPartition(ResultPartition partition) throws IOException { @VisibleForTesting public void setupInputGate(SingleInputGate gate) throws IOException { BufferPool bufferPool = null; - int maxNumberOfMemorySegments; try { if (enableCreditBased) { - maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - extraNetworkBuffersPerGate : Integer.MAX_VALUE; - // assign exclusive buffers to input channels directly and use the rest for floating buffers - gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); - bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments); + int nrExclusiveMemorySegments = gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); + int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? + gate.getNumberOfInputChannels() * networkBuffersPerChannel + + extraNetworkBuffersPerGate - nrExclusiveMemorySegments : Integer.MAX_VALUE; + bufferPool = networkBufferPool + .createBufferPool(0, maxNumberOfMemorySegments); --- End diff -- I think that express this way: ``` if (enableCreditBased) { int desiredMaxNumberOfMemorySegments = gate.getNumberOfInputChannels() * networkBuffersPerChannel + extraNetworkBuffersPerGate; int assignedExclusiveMemorySegments = gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); int floatingMemorySegments = desiredMaxNumberOfMemorySegments - assignedExclusiveMemorySegments; bufferPool = networkBufferPool .createBufferPool(0, gate.getConsumedPartitionType().isBounded() ? floatingMemorySegments : Integer.MAX_VALUE); } ``` it's more easier to understand and allows us to skip the redundant comment. Especially current `maxNumberOfMemorySegments` is strange name. > Make buffer count per InputGate always #channels*buffersPerChannel + > ExclusiveBuffers > - > > Key: FLINK-9253 > URL: https://issues.apache.org/jira/browse/FLINK-9253 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Fix For: 1.5.0 > > > The credit-based flow control path assigns exclusive buffers only to remote > channels (which makes sense since local channels don't use any own buffers). > However, this is a bit intransparent with respect to how much data may be in > buffers since this depends on the actual schedule of the job and not the job > graph. > By adapting the floating buffers to use a maximum of > {{#channels*buffersPerChannel + floatingBuffersPerGate - #exclusiveBuffers}}, > we would be channel-type agnostic and keep the old behaviour. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9253) Make buffer count per InputGate always #channels*buffersPerChannel + ExclusiveBuffers
[ https://issues.apache.org/jira/browse/FLINK-9253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462536#comment-16462536 ] ASF GitHub Bot commented on FLINK-9253: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5923#discussion_r185817506 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -228,20 +228,19 @@ public void setupPartition(ResultPartition partition) throws IOException { @VisibleForTesting public void setupInputGate(SingleInputGate gate) throws IOException { BufferPool bufferPool = null; - int maxNumberOfMemorySegments; try { if (enableCreditBased) { - maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - extraNetworkBuffersPerGate : Integer.MAX_VALUE; - // assign exclusive buffers to input channels directly and use the rest for floating buffers - gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); - bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments); + int nrExclusiveMemorySegments = gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); --- End diff -- Please, no abbreviations like `nrExclusiveMemorySegments`. `assignedExclusiveMemorySegments`? `exclusiveMemorySegments`? > Make buffer count per InputGate always #channels*buffersPerChannel + > ExclusiveBuffers > - > > Key: FLINK-9253 > URL: https://issues.apache.org/jira/browse/FLINK-9253 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Major > Fix For: 1.5.0 > > > The credit-based flow control path assigns exclusive buffers only to remote > channels (which makes sense since local channels don't use any own buffers). > However, this is a bit intransparent with respect to how much data may be in > buffers since this depends on the actual schedule of the job and not the job > graph. > By adapting the floating buffers to use a maximum of > {{#channels*buffersPerChannel + floatingBuffersPerGate - #exclusiveBuffers}}, > we would be channel-type agnostic and keep the old behaviour. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5923: [FLINK-9253][network] make the maximum floating bu...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5923#discussion_r185819830 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -228,20 +228,19 @@ public void setupPartition(ResultPartition partition) throws IOException { @VisibleForTesting public void setupInputGate(SingleInputGate gate) throws IOException { BufferPool bufferPool = null; - int maxNumberOfMemorySegments; try { if (enableCreditBased) { - maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? - extraNetworkBuffersPerGate : Integer.MAX_VALUE; - // assign exclusive buffers to input channels directly and use the rest for floating buffers - gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); - bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments); + int nrExclusiveMemorySegments = gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); + int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? + gate.getNumberOfInputChannels() * networkBuffersPerChannel + + extraNetworkBuffersPerGate - nrExclusiveMemorySegments : Integer.MAX_VALUE; + bufferPool = networkBufferPool + .createBufferPool(0, maxNumberOfMemorySegments); --- End diff -- I think that express this way: ``` if (enableCreditBased) { int desiredMaxNumberOfMemorySegments = gate.getNumberOfInputChannels() * networkBuffersPerChannel + extraNetworkBuffersPerGate; int assignedExclusiveMemorySegments = gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel); int floatingMemorySegments = desiredMaxNumberOfMemorySegments - assignedExclusiveMemorySegments; bufferPool = networkBufferPool .createBufferPool(0, gate.getConsumedPartitionType().isBounded() ? floatingMemorySegments : Integer.MAX_VALUE); } ``` it's more easier to understand and allows us to skip the redundant comment. Especially current `maxNumberOfMemorySegments` is strange name. ---
[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185821296 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -373,15 +370,14 @@ public void read(DataInputView in) throws IOException { Thread.currentThread().setContextClassLoader(userClassLoader); typeSerializer = (TypeSerializer) ois.readObject(); - } catch (ClassNotFoundException | InvalidClassException e) { + } catch (Exception e) { if (useDummyPlaceholder) { // we create a dummy so that all the information is not lost when we get a new checkpoint before receiving // a proper typeserializer from the user - typeSerializer = - new UnloadableDummyTypeSerializer<>(buffer); - LOG.warn("Could not find requested TypeSerializer class in classpath. Created dummy.", e); + typeSerializer = new UnloadableDummyTypeSerializer<>(buffer); --- End diff -- Some food for thought, even if it is not introduced by this PR: why can we not introduce a special `UnloadableSerializerException extends IOException` that holds a field with the byte array in `buffer` and let it bubble up to a higher level component. If that component wants to introduce dummies, it can do some from the bytes in the caught exception, if not forward the exception. Then we would not have to hand down this flag but let the higher level component decide. What do you think? ---
[jira] [Commented] (FLINK-9088) Upgrade Nifi connector dependency to 1.6.0
[ https://issues.apache.org/jira/browse/FLINK-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462530#comment-16462530 ] Ted Yu commented on FLINK-9088: --- lgtm > Upgrade Nifi connector dependency to 1.6.0 > -- > > Key: FLINK-9088 > URL: https://issues.apache.org/jira/browse/FLINK-9088 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Ted Yu >Assignee: Hai Zhou >Priority: Major > > Currently dependency of Nifi is 0.6.1 > We should upgrade to 1.6.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8554) Upgrade AWS SDK
[ https://issues.apache.org/jira/browse/FLINK-8554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-8554: -- Description: AWS SDK 1.11.271 fixes a lot of bugs. One of which would exhibit the following: {code} Caused by: java.lang.NullPointerException at com.amazonaws.metrics.AwsSdkMetrics.getRegion(AwsSdkMetrics.java:729) at com.amazonaws.metrics.MetricAdmin.getRegion(MetricAdmin.java:67) at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) {code} was: AWS SDK 1.11.271 fixes a lot of bugs. One of which would exhibit the following: {code} Caused by: java.lang.NullPointerException at com.amazonaws.metrics.AwsSdkMetrics.getRegion(AwsSdkMetrics.java:729) at com.amazonaws.metrics.MetricAdmin.getRegion(MetricAdmin.java:67) at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) {code} > Upgrade AWS SDK > --- > > Key: FLINK-8554 > URL: https://issues.apache.org/jira/browse/FLINK-8554 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Priority: Minor > > AWS SDK 1.11.271 fixes a lot of bugs. > One of which would exhibit the following: > {code} > Caused by: java.lang.NullPointerException > at com.amazonaws.metrics.AwsSdkMetrics.getRegion(AwsSdkMetrics.java:729) > at com.amazonaws.metrics.MetricAdmin.getRegion(MetricAdmin.java:67) > at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9236) Use Apache Parent POM 19
[ https://issues.apache.org/jira/browse/FLINK-9236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9236: -- Description: Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out. This will also fix Javadoc generation with JDK 10+ was:Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out. This will also fix Javadoc generation with JDK 10+ > Use Apache Parent POM 19 > > > Key: FLINK-9236 > URL: https://issues.apache.org/jira/browse/FLINK-9236 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: mingleizhang >Priority: Major > > Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out. > This will also fix Javadoc generation with JDK 10+ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade
[ https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462520#comment-16462520 ] ASF GitHub Bot commented on FLINK-8978: --- Github user azagrebin commented on the issue: https://github.com/apache/flink/pull/5947 Thanks for review and good points @StefanRRichter I updated the PR to address the comments. The resume state e2e test also checks operator <-> state correspondence upon state restoration now. > End-to-end test: Job upgrade > > > Key: FLINK-8978 > URL: https://issues.apache.org/jira/browse/FLINK-8978 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Andrey Zagrebin >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > Job upgrades usually happen during the lifetime of a real world Flink job. > Therefore, we should add an end-to-end test which exactly covers this > scenario. I suggest to do the follwoing: > # run the general purpose testing job FLINK-8971 > # take a savepoint > # Modify the job by introducing a new operator and changing the order of > others > # Resume the modified job from the savepoint > # Verify that everything went correctly -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5947: [FLINK-8978] Stateful generic stream job upgrade e2e test
Github user azagrebin commented on the issue: https://github.com/apache/flink/pull/5947 Thanks for review and good points @StefanRRichter I updated the PR to address the comments. The resume state e2e test also checks operator <-> state correspondence upon state restoration now. ---
[jira] [Commented] (FLINK-9287) KafkaProducer011 seems to leak threads when not in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-9287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462480#comment-16462480 ] ASF GitHub Bot commented on FLINK-9287: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5952 [FLINK-9287][kafka] Properly clean up resources in non EXACTLY_ONCE FlinkKafkaProducer011 Previously FlinkKafkaProducer was not being closed for AT_LEAST_ONCE and NONE Semantics when closing FlinkKafkaProducer011. This was leading to resources leaking (for example increasing number of active threads). ## Verifying this change This bug fix might be hard to test automatically. This PR adds a new test proposal in separate commit, however it might be flaky if there are other tests being executed in parallel. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f9287 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5952.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5952 commit c9139a90dd6a2afb25a4eb3102e1abedf90d8f5f Author: Piotr NowojskiDate: 2018-05-03T13:50:53Z [FLINK-9287][kafka] Properly clean up resources in non EXACTLY_ONCE FlinkKafkaProducer011 Previously FlinkKafkaProducer was not being closed for AT_LEAST_ONCE and NONE Semantics when closing FlinkKafkaProducer011. This was leading to resources leaking (for example increasing number of active threads) commit 5a1d0962f3fb92a87ee809b5a09106f5c4d05caf Author: Piotr Nowojski Date: 2018-05-03T13:53:40Z [FLINK-9287][kafka] Ensure threads count do not grow in FlinkKafkaProducer011 > KafkaProducer011 seems to leak threads when not in exactly-once mode > > > Key: FLINK-9287 > URL: https://issues.apache.org/jira/browse/FLINK-9287 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Christopher Ng >Priority: Blocker > Fix For: 1.5.0 > > > {{KafkaProducer011}} appears to be leaking {{kafka-producer-network-thread}} > threads. As far as I can tell it happens when it is not in EXACTLY_ONCE > mode, it seems that it creates a {{FlinkKafkaProducer}} but never closes it, > even when the {{FlinkKafkaProducer011}} itself is closed. > I observed this when running a local cluster and submitting and then > cancelling a job, a lot of kafka threads were left alive afterwards. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9287) KafkaProducer011 seems to leak threads when not in exactly-once mode
[ https://issues.apache.org/jira/browse/FLINK-9287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-9287: - Assignee: Piotr Nowojski > KafkaProducer011 seems to leak threads when not in exactly-once mode > > > Key: FLINK-9287 > URL: https://issues.apache.org/jira/browse/FLINK-9287 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 >Reporter: Christopher Ng >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.5.0 > > > {{KafkaProducer011}} appears to be leaking {{kafka-producer-network-thread}} > threads. As far as I can tell it happens when it is not in EXACTLY_ONCE > mode, it seems that it creates a {{FlinkKafkaProducer}} but never closes it, > even when the {{FlinkKafkaProducer011}} itself is closed. > I observed this when running a local cluster and submitting and then > cancelling a job, a lot of kafka threads were left alive afterwards. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5952: [FLINK-9287][kafka] Properly clean up resources in...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5952 [FLINK-9287][kafka] Properly clean up resources in non EXACTLY_ONCE FlinkKafkaProducer011 Previously FlinkKafkaProducer was not being closed for AT_LEAST_ONCE and NONE Semantics when closing FlinkKafkaProducer011. This was leading to resources leaking (for example increasing number of active threads). ## Verifying this change This bug fix might be hard to test automatically. This PR adds a new test proposal in separate commit, however it might be flaky if there are other tests being executed in parallel. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f9287 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5952.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5952 commit c9139a90dd6a2afb25a4eb3102e1abedf90d8f5f Author: Piotr NowojskiDate: 2018-05-03T13:50:53Z [FLINK-9287][kafka] Properly clean up resources in non EXACTLY_ONCE FlinkKafkaProducer011 Previously FlinkKafkaProducer was not being closed for AT_LEAST_ONCE and NONE Semantics when closing FlinkKafkaProducer011. This was leading to resources leaking (for example increasing number of active threads) commit 5a1d0962f3fb92a87ee809b5a09106f5c4d05caf Author: Piotr Nowojski Date: 2018-05-03T13:53:40Z [FLINK-9287][kafka] Ensure threads count do not grow in FlinkKafkaProducer011 ---
[jira] [Assigned] (FLINK-9284) Update CLI page
[ https://issues.apache.org/jira/browse/FLINK-9284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-9284: --- Assignee: Triones Deng > Update CLI page > --- > > Key: FLINK-9284 > URL: https://issues.apache.org/jira/browse/FLINK-9284 > Project: Flink > Issue Type: Improvement > Components: Client, Documentation >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Triones Deng >Priority: Critical > Fix For: 1.5.0 > > > The [CLI|https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html] > page must be updated for 1.5. > The > [examples|https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#examples] > using the {{-m}} option must be updated to use {{8081}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6
[ https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-8900. --- > YARN FinalStatus always shows as KILLED with Flip-6 > --- > > Key: FLINK-8900 > URL: https://issues.apache.org/jira/browse/FLINK-8900 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Stephan Ewen >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Whenever I run a simple simple word count like this one on YARN with Flip-6 > enabled, > {code} > ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING > {code} > it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns > even though the program ran successfully like this one (irrespective of > FLINK-8899 occurring or not): > {code} > 2018-03-08 16:48:39,049 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming > WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to > FINISHED. > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20 > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Shutting down > 2018-03-08 16:48:39,078 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job > 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED. > 2018-03-08 16:48:39,151 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager. > 2018-03-08 16:48:39,221 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Stopping the JobMaster for job Streaming > WordCount(11a794d2f5dc2955d8015625ec300c20). > 2018-03-08 16:48:39,270 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Close ResourceManager connection > 43f725adaee14987d3ff99380701f52f: JobManager is shutting down.. > 2018-03-08 16:48:39,270 INFO org.apache.flink.yarn.YarnResourceManager > - Disconnect job manager > 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0 > for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.JobManagerRunner - > JobManagerRunner already shutdown. > 2018-03-08 16:48:39,775 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager. > 2018-03-08 16:48:39,846 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager. > 2018-03-08 16:48:39,876 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED > SIGNAL 15: SIGTERM. Shutting down as requested. > 2018-03-08 16:48:39,910 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager. > 2018-03-08 16:48:39,942 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager. > 2018-03-08 16:48:39,974 INFO org.apache.flink.runtime.blob.BlobServer > - Stopped BLOB server at 0.0.0.0:46511 > 2018-03-08 16:48:39,975 INFO > org.apache.flink.runtime.blob.TransientBlobCache - Shutting down > BLOB cache > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6
[ https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-8900. - Resolution: Fixed Assignee: Stephan Ewen (was: Gary Yao) Fixed in - 1.5.0 via 5b26718404a89744fd4ddcdd963a712ec581222c - 1.6.0 via 545d530747067c805071935ec2bd62083299164b > YARN FinalStatus always shows as KILLED with Flip-6 > --- > > Key: FLINK-8900 > URL: https://issues.apache.org/jira/browse/FLINK-8900 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Stephan Ewen >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Whenever I run a simple simple word count like this one on YARN with Flip-6 > enabled, > {code} > ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING > {code} > it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns > even though the program ran successfully like this one (irrespective of > FLINK-8899 occurring or not): > {code} > 2018-03-08 16:48:39,049 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming > WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to > FINISHED. > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20 > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Shutting down > 2018-03-08 16:48:39,078 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job > 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED. > 2018-03-08 16:48:39,151 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager. > 2018-03-08 16:48:39,221 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Stopping the JobMaster for job Streaming > WordCount(11a794d2f5dc2955d8015625ec300c20). > 2018-03-08 16:48:39,270 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Close ResourceManager connection > 43f725adaee14987d3ff99380701f52f: JobManager is shutting down.. > 2018-03-08 16:48:39,270 INFO org.apache.flink.yarn.YarnResourceManager > - Disconnect job manager > 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0 > for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.JobManagerRunner - > JobManagerRunner already shutdown. > 2018-03-08 16:48:39,775 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager. > 2018-03-08 16:48:39,846 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager. > 2018-03-08 16:48:39,876 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED > SIGNAL 15: SIGTERM. Shutting down as requested. > 2018-03-08 16:48:39,910 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager. > 2018-03-08 16:48:39,942 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager. > 2018-03-08 16:48:39,974 INFO org.apache.flink.runtime.blob.BlobServer > - Stopped BLOB server at 0.0.0.0:46511 > 2018-03-08 16:48:39,975 INFO > org.apache.flink.runtime.blob.TransientBlobCache - Shutting down > BLOB cache > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9288) clarify a few points in the event time / watermark docs
[ https://issues.apache.org/jira/browse/FLINK-9288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462401#comment-16462401 ] ASF GitHub Bot commented on FLINK-9288: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5949 > clarify a few points in the event time / watermark docs > --- > > Key: FLINK-9288 > URL: https://issues.apache.org/jira/browse/FLINK-9288 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: David Anderson >Assignee: David Anderson >Priority: Minor > Fix For: 1.5.0 > > > There are a few things that folks often seem to miss when reading the event > time and watermark docs. Adding a couple of sentences and a couple of links > should help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6
[ https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462402#comment-16462402 ] ASF GitHub Bot commented on FLINK-8900: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5944 > YARN FinalStatus always shows as KILLED with Flip-6 > --- > > Key: FLINK-8900 > URL: https://issues.apache.org/jira/browse/FLINK-8900 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Whenever I run a simple simple word count like this one on YARN with Flip-6 > enabled, > {code} > ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c > org.apache.flink.streaming.examples.wordcount.WordCount > ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING > {code} > it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns > even though the program ran successfully like this one (irrespective of > FLINK-8899 occurring or not): > {code} > 2018-03-08 16:48:39,049 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming > WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to > FINISHED. > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20 > 2018-03-08 16:48:39,050 INFO > org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - > Shutting down > 2018-03-08 16:48:39,078 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job > 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED. > 2018-03-08 16:48:39,151 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager. > 2018-03-08 16:48:39,221 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Stopping the JobMaster for job Streaming > WordCount(11a794d2f5dc2955d8015625ec300c20). > 2018-03-08 16:48:39,270 INFO org.apache.flink.runtime.jobmaster.JobMaster > - Close ResourceManager connection > 43f725adaee14987d3ff99380701f52f: JobManager is shutting down.. > 2018-03-08 16:48:39,270 INFO org.apache.flink.yarn.YarnResourceManager > - Disconnect job manager > 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0 > for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping > SlotPool. > 2018-03-08 16:48:39,349 INFO > org.apache.flink.runtime.jobmaster.JobManagerRunner - > JobManagerRunner already shutdown. > 2018-03-08 16:48:39,775 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager. > 2018-03-08 16:48:39,846 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager. > 2018-03-08 16:48:39,876 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED > SIGNAL 15: SIGTERM. Shutting down as requested. > 2018-03-08 16:48:39,910 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager. > 2018-03-08 16:48:39,942 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Register > TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager. > 2018-03-08 16:48:39,974 INFO org.apache.flink.runtime.blob.BlobServer > - Stopped BLOB server at 0.0.0.0:46511 > 2018-03-08 16:48:39,975 INFO > org.apache.flink.runtime.blob.TransientBlobCache - Shutting down > BLOB cache > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9288) clarify a few points in the event time / watermark docs
[ https://issues.apache.org/jira/browse/FLINK-9288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-9288. - Resolution: Fixed Fix Version/s: (was: 1.6.0) Fixed in - 1.5.0 via e06d36ae17dfd9e830d4f4729996cf2939fd5f75 - 1.6.0 via 90855b638caebd1032e699528a3e0bd232b7c95a > clarify a few points in the event time / watermark docs > --- > > Key: FLINK-9288 > URL: https://issues.apache.org/jira/browse/FLINK-9288 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: David Anderson >Assignee: David Anderson >Priority: Minor > Fix For: 1.5.0 > > > There are a few things that folks often seem to miss when reading the event > time and watermark docs. Adding a couple of sentences and a couple of links > should help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9288) clarify a few points in the event time / watermark docs
[ https://issues.apache.org/jira/browse/FLINK-9288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-9288. --- > clarify a few points in the event time / watermark docs > --- > > Key: FLINK-9288 > URL: https://issues.apache.org/jira/browse/FLINK-9288 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: David Anderson >Assignee: David Anderson >Priority: Minor > Fix For: 1.5.0 > > > There are a few things that folks often seem to miss when reading the event > time and watermark docs. Adding a couple of sentences and a couple of links > should help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5944 ---
[GitHub] flink pull request #5949: [FLINK-9288][docs] clarify the event time / waterm...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5949 ---
[jira] [Commented] (FLINK-9235) Add Integration test for Flink-Yarn-Kerberos integration for flip-6
[ https://issues.apache.org/jira/browse/FLINK-9235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462393#comment-16462393 ] ASF GitHub Bot commented on FLINK-9235: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5901 @suez1224 Ah, I think the reason this test passes even without your fix in #5896 is that the test doesn't really test submission to a YARN cluster on different machines. The TaskManager runner will pick up the path of the key tab that exists on the local filesystem. > Add Integration test for Flink-Yarn-Kerberos integration for flip-6 > --- > > Key: FLINK-9235 > URL: https://issues.apache.org/jira/browse/FLINK-9235 > Project: Flink > Issue Type: Test >Affects Versions: 1.5.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > > We need to provide an integration test for flip-6 similar to > YARNSessionFIFOSecuredITCase for the legacy deployment mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5901: [FLINK-9235][Security] Add integration tests for YARN ker...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5901 @suez1224 Ah, I think the reason this test passes even without your fix in #5896 is that the test doesn't really test submission to a YARN cluster on different machines. The TaskManager runner will pick up the path of the key tab that exists on the local filesystem. ---
[jira] [Commented] (FLINK-9207) Client returns SUCCESS(0) return code for canceled job
[ https://issues.apache.org/jira/browse/FLINK-9207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462366#comment-16462366 ] Amit Jain commented on FLINK-9207: -- [~aljoscha] I've not got the time to check on 1.4.x however, I tested on Version: 1.3.2 and found correct return code i.e 1 > Client returns SUCCESS(0) return code for canceled job > -- > > Key: FLINK-9207 > URL: https://issues.apache.org/jira/browse/FLINK-9207 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.5.0 > Environment: Version: 1.5.0, Commit : 2af481a >Reporter: Amit Jain >Priority: Major > Fix For: 1.6.0 > > > Flink Client returns zero return code when a job is deliberately canceled. > Steps to reproduced it: > 1. bin/flink run -p 10 -m yarn-cluster -yjm 1024 -ytm 12288 WordCount.jar > 2. User externally canceled the job. > 3. Job Manager marked the job as CANCELED. > 4. Although client code emits following logs, still returns zero return code. > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Killed > application application_1523726493647_. > Job scheduler like Airflow would have hard-time detecting whether the > submitted job was canceled or not. > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462345#comment-16462345 ] Fred Teunissen commented on FLINK-8500: --- [~aljoscha] I have some time that I can spend on this, so yes I'm interested. I can also update the current PR with the changes unless you think it's better to make a new fresh PR for this. There are now 3 possible approaches. # Kafka {{ConsumerRecord}} as parameter in the Flink API # a new {{ConsumerRecordMetaInfo}} class as parameter for the {{deserialize}} method # extend the interface {{KeyedDeserializationSchema}} with a new method, although I don't know how to give an interface default behavior in java. I'm leaning to almost the same approach I used now, creating a new separate interface {{RichDeserializationSchema}} with a {{deserialize}} method with a {{ConsumerRecordMetaInfo}} parameter. Also create a {{RichDeserializationSchemaWrapper}} for implementing the current API for backwards compatibility Do you think this is the right approach? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462341#comment-16462341 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185775633 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -589,7 +589,7 @@ private void restoreKeyGroupsInStateHandle() private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); + new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader, false); --- End diff -- Maybe a small comment on this line why we can tolerate the absence of the serializer is helpful for future maintenance. And a matching comment for the other option on the corresponding line in the heap backend. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185775633 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -589,7 +589,7 @@ private void restoreKeyGroupsInStateHandle() private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); + new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader, false); --- End diff -- Maybe a small comment on this line why we can tolerate the absence of the serializer is helpful for future maintenance. And a matching comment for the other option on the corresponding line in the heap backend. ---
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462326#comment-16462326 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185772501 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/ArtificialCNFErrorThrowingClassLoader.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.testutils; + +import java.util.Set; + +/** + * Utility classloader used in tests that allows simulating {@link ClassNotFoundException}s for specific classes. + */ +public class ArtificialCNFErrorThrowingClassLoader extends ClassLoader { --- End diff -- `ArtificialCNFExceptionThrowingClassLoader` might be a better fit > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from
[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185772501 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/ArtificialCNFErrorThrowingClassLoader.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.testutils; + +import java.util.Set; + +/** + * Utility classloader used in tests that allows simulating {@link ClassNotFoundException}s for specific classes. + */ +public class ArtificialCNFErrorThrowingClassLoader extends ClassLoader { --- End diff -- `ArtificialCNFExceptionThrowingClassLoader` might be a better fit ---
[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized
[ https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462321#comment-16462321 ] ASF GitHub Bot commented on FLINK-9169: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185771830 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -69,7 +69,7 @@ * written using {@link #writeSerializer(DataOutputView, TypeSerializer)}. * * If deserialization fails for any reason (corrupted serializer bytes, serializer class -* no longer in classpath, serializer class no longer valid, etc.), {@code null} will +* no longer in classpath, serializer class no longer valid, etc.), an {@link IOException} is thrown. --- End diff -- Comment in the next line should be deleted, looks like leftover from the copy-paste. > NPE when restoring from old savepoint and state serializer could not be > deserialized > > > Key: FLINK-9169 > URL: https://issues.apache.org/jira/browse/FLINK-9169 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Till Rohrmann >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0 > > > A user reported to have observed the following exception when restoring a > Flink job from a 1.3 savepoint with Flink 1.4. > {code} > 2018-04-02 21:44:18,146 INFO org.apache.flink.runtime.taskmanager.Task > - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65 > 6fa6) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB > ackend.java:1216) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye > dStateBackend.java:1153) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1 > 139) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > {code} > Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create > {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the > {{stateSerializer}} can be {{null}}. This is not the problem, however, in > {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a > {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} > where we null check the state serializer. This will then fail with an > indescriptive NPE. > I think the same should happen when resuming with Flink 1.5 from a 1.4 > savepoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5950#discussion_r185771830 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java --- @@ -69,7 +69,7 @@ * written using {@link #writeSerializer(DataOutputView, TypeSerializer)}. * * If deserialization fails for any reason (corrupted serializer bytes, serializer class -* no longer in classpath, serializer class no longer valid, etc.), {@code null} will +* no longer in classpath, serializer class no longer valid, etc.), an {@link IOException} is thrown. --- End diff -- Comment in the next line should be deleted, looks like leftover from the copy-paste. ---